Merge pull request #15196 from AspirinSJL/load_reporter

Add load reporter for server load reporting
pull/15684/head
Juanli Shen 7 years ago committed by GitHub
commit 8bda53ee64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 56
      BUILD
  2. 71
      src/cpp/server/load_reporter/constants.h
  3. 36
      src/cpp/server/load_reporter/get_cpu_stats.h
  4. 45
      src/cpp/server/load_reporter/get_cpu_stats_linux.cc
  5. 45
      src/cpp/server/load_reporter/get_cpu_stats_macos.cc
  6. 40
      src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc
  7. 55
      src/cpp/server/load_reporter/get_cpu_stats_windows.cc
  8. 65
      src/cpp/server/load_reporter/load_data_store.cc
  9. 34
      src/cpp/server/load_reporter/load_data_store.h
  10. 498
      src/cpp/server/load_reporter/load_reporter.cc
  11. 225
      src/cpp/server/load_reporter/load_reporter.h
  12. 14
      src/proto/grpc/lb/v1/BUILD
  13. 180
      src/proto/grpc/lb/v1/load_reporter.proto
  14. 34
      test/cpp/server/load_reporter/BUILD
  15. 61
      test/cpp/server/load_reporter/get_cpu_stats_test.cc
  16. 4
      test/cpp/server/load_reporter/load_data_store_test.cc
  17. 498
      test/cpp/server/load_reporter/load_reporter_test.cc

56
BUILD

@ -29,8 +29,8 @@ package(
load(
"//bazel:grpc_build_system.bzl",
"grpc_cc_library",
"grpc_proto_plugin",
"grpc_generate_one_off_targets",
"grpc_proto_plugin",
)
config_setting(
@ -675,8 +675,8 @@ grpc_cc_library(
"src/core/lib/channel/channel_stack.cc",
"src/core/lib/channel/channel_stack_builder.cc",
"src/core/lib/channel/channel_trace.cc",
"src/core/lib/channel/channelz_registry.cc",
"src/core/lib/channel/channelz.cc",
"src/core/lib/channel/channelz_registry.cc",
"src/core/lib/channel/connected_channel.cc",
"src/core/lib/channel/handshaker.cc",
"src/core/lib/channel/handshaker_factory.cc",
@ -823,8 +823,8 @@ grpc_cc_library(
"src/core/lib/channel/channel_stack.h",
"src/core/lib/channel/channel_stack_builder.h",
"src/core/lib/channel/channel_trace.h",
"src/core/lib/channel/channelz_registry.h",
"src/core/lib/channel/channelz.h",
"src/core/lib/channel/channelz_registry.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
"src/core/lib/channel/handshaker.h",
@ -1308,6 +1308,7 @@ grpc_cc_library(
"src/cpp/server/load_reporter/load_data_store.cc",
],
hdrs = [
"src/cpp/server/load_reporter/constants.h",
"src/cpp/server/load_reporter/load_data_store.h",
],
language = "c++",
@ -1316,6 +1317,43 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "lb_get_cpu_stats",
srcs = [
"src/cpp/server/load_reporter/get_cpu_stats_linux.cc",
"src/cpp/server/load_reporter/get_cpu_stats_macos.cc",
"src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc",
"src/cpp/server/load_reporter/get_cpu_stats_windows.cc",
],
hdrs = [
"src/cpp/server/load_reporter/get_cpu_stats.h",
],
language = "c++",
deps = [
"grpc++",
],
)
grpc_cc_library(
name = "lb_load_reporter",
srcs = [
"src/cpp/server/load_reporter/load_reporter.cc",
],
hdrs = [
"src/cpp/server/load_reporter/constants.h",
"src/cpp/server/load_reporter/load_reporter.h",
],
external_deps = [
"opencensus-stats",
],
language = "c++",
deps = [
"lb_get_cpu_stats",
"lb_load_data_store",
"//src/proto/grpc/lb/v1:load_reporter_proto",
],
)
grpc_cc_library(
name = "grpc_resolver_dns_native",
srcs = [
@ -1739,11 +1777,11 @@ grpc_cc_library(
"src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.h",
"src/core/tsi/alts/handshaker/transport_security_common_api.h",
],
public_hdrs = GRPC_SECURE_PUBLIC_HDRS,
external_deps = [
"nanopb",
],
language = "c++",
public_hdrs = GRPC_SECURE_PUBLIC_HDRS,
deps = [
"alts_proto",
"gpr",
@ -1992,33 +2030,33 @@ grpc_cc_library(
grpc_cc_library(
name = "grpc_opencensus_plugin",
srcs = [
"src/cpp/ext/filters/census/client_filter.cc",
"src/cpp/ext/filters/census/server_filter.cc",
"src/cpp/ext/filters/census/channel_filter.cc",
"src/cpp/ext/filters/census/client_filter.cc",
"src/cpp/ext/filters/census/context.cc",
"src/cpp/ext/filters/census/grpc_context.cc",
"src/cpp/ext/filters/census/grpc_plugin.cc",
"src/cpp/ext/filters/census/measures.cc",
"src/cpp/ext/filters/census/rpc_encoding.cc",
"src/cpp/ext/filters/census/server_filter.cc",
"src/cpp/ext/filters/census/views.cc",
],
hdrs = [
"include/grpcpp/opencensus.h",
"src/cpp/ext/filters/census/client_filter.h",
"src/cpp/ext/filters/census/server_filter.h",
"src/cpp/ext/filters/census/channel_filter.h",
"src/cpp/ext/filters/census/client_filter.h",
"src/cpp/ext/filters/census/context.h",
"src/cpp/ext/filters/census/grpc_plugin.h",
"src/cpp/ext/filters/census/measures.h",
"src/cpp/ext/filters/census/rpc_encoding.h",
"src/cpp/ext/filters/census/server_filter.h",
],
language = "c++",
external_deps = [
"absl-base",
"absl-time",
"opencensus-trace",
"opencensus-stats",
],
language = "c++",
deps = [
":census",
":grpc++",

@ -0,0 +1,71 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
#include <grpc/impl/codegen/port_platform.h>
namespace grpc {
namespace load_reporter {
constexpr size_t kLbIdLength = 8;
constexpr size_t kIpv4AddressLength = 8;
constexpr size_t kIpv6AddressLength = 32;
constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>";
// Call statuses.
constexpr char kCallStatusOk[] = "OK";
constexpr char kCallStatusServerError[] = "5XX";
constexpr char kCallStatusClientError[] = "4XX";
// Tag keys.
constexpr char kTagKeyToken[] = "token";
constexpr char kTagKeyHost[] = "host";
constexpr char kTagKeyUserId[] = "user_id";
constexpr char kTagKeyStatus[] = "status";
constexpr char kTagKeyMetricName[] = "metric_name";
// Measure names.
constexpr char kMeasureStartCount[] = "grpc.io/lb/start_count";
constexpr char kMeasureEndCount[] = "grpc.io/lb/end_count";
constexpr char kMeasureEndBytesSent[] = "grpc.io/lb/bytes_sent";
constexpr char kMeasureEndBytesReceived[] = "grpc.io/lb/bytes_received";
constexpr char kMeasureEndLatencyMs[] = "grpc.io/lb/latency_ms";
constexpr char kMeasureOtherCallMetric[] = "grpc.io/lb/other_call_metric";
// View names.
constexpr char kViewStartCount[] = "grpc.io/lb_view/start_count";
constexpr char kViewEndCount[] = "grpc.io/lb_view/end_count";
constexpr char kViewEndBytesSent[] = "grpc.io/lb_view/bytes_sent";
constexpr char kViewEndBytesReceived[] = "grpc.io/lb_view/bytes_received";
constexpr char kViewEndLatencyMs[] = "grpc.io/lb_view/latency_ms";
constexpr char kViewOtherCallMetricCount[] =
"grpc.io/lb_view/other_call_metric_count";
constexpr char kViewOtherCallMetricValue[] =
"grpc.io/lb_view/other_call_metric_value";
} // namespace load_reporter
} // namespace grpc
#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H

@ -0,0 +1,36 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
#include <grpc/impl/codegen/port_platform.h>
#include <utility>
namespace grpc {
namespace load_reporter {
// Reads the CPU stats (in a pair of busy and total numbers) from the system.
// The units of the stats should be the same.
std::pair<uint64_t, uint64_t> GetCpuStatsImpl();
} // namespace load_reporter
} // namespace grpc
#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H

@ -0,0 +1,45 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#ifdef GPR_LINUX
#include <cstdio>
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
namespace grpc {
namespace load_reporter {
std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
uint64_t busy = 0, total = 0;
FILE* fp;
fp = fopen("/proc/stat", "r");
uint64_t user, nice, system, idle;
fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle);
fclose(fp);
busy = user + nice + system;
total = busy + idle;
return std::make_pair(busy, total);
}
} // namespace load_reporter
} // namespace grpc
#endif // GPR_LINUX

@ -0,0 +1,45 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#ifdef GPR_APPLE
#include <mach/mach.h>
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
namespace grpc {
namespace load_reporter {
std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
uint64_t busy = 0, total = 0;
host_cpu_load_info_data_t cpuinfo;
mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT;
if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO,
(host_info_t)&cpuinfo, &count) == KERN_SUCCESS) {
for (int i = 0; i < CPU_STATE_MAX; i++) total += cpuinfo.cpu_ticks[i];
busy = total - cpuinfo.cpu_ticks[CPU_STATE_IDLE];
}
return std::make_pair(busy, total);
}
} // namespace load_reporter
} // namespace grpc
#endif // GPR_APPLE

@ -0,0 +1,40 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#if !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
#include <grpc/support/log.h>
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
namespace grpc {
namespace load_reporter {
std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
uint64_t busy = 0, total = 0;
gpr_log(GPR_ERROR,
"Platforms other than Linux, Windows, and MacOS are not supported.");
return std::make_pair(busy, total);
}
} // namespace load_reporter
} // namespace grpc
#endif // !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)

@ -0,0 +1,55 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#ifdef GPR_WINDOWS
#include <windows.h>
#include <cstdint>
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
namespace grpc {
namespace load_reporter {
namespace {
uint64_t FiletimeToInt(const FILETIME& ft) {
ULARGE_INTEGER i;
i.LowPart = ft.dwLowDateTime;
i.HighPart = ft.dwHighDateTime;
return i.QuadPart;
}
} // namespace
std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
uint64_t busy = 0, total = 0;
FILETIME idle, kernel, user;
if (GetSystemTimes(&idle, &kernel, &user) != 0) {
total = FiletimeToInt(kernel) + FiletimeToInt(user);
busy = total - FiletimeToInt(idle);
}
return std::make_pair(busy, total);
}
} // namespace load_reporter
} // namespace grpc
#endif // GPR_WINDOWS

@ -16,11 +16,15 @@
*
*/
#include <grpc/impl/codegen/port_platform.h>
#include <stdio.h>
#include <cstdlib>
#include <set>
#include <unordered_map>
#include <vector>
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/cpp/server/load_reporter/load_data_store.h"
namespace grpc {
@ -73,6 +77,67 @@ const typename C::value_type* RandomElement(const C& container) {
} // namespace
LoadRecordKey::LoadRecordKey(const grpc::string& client_ip_and_token,
grpc::string user_id)
: user_id_(std::move(user_id)) {
GPR_ASSERT(client_ip_and_token.size() >= 2);
int ip_hex_size;
GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
&ip_hex_size) == 1);
GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
ip_hex_size == kIpv6AddressLength);
size_t cur_pos = 2;
client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
cur_pos += ip_hex_size;
if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
lb_id_ = kInvalidLbId;
lb_tag_ = "";
} else {
lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
}
}
grpc::string LoadRecordKey::GetClientIpBytes() const {
if (client_ip_hex_.empty()) {
return "";
} else if (client_ip_hex_.size() == kIpv4AddressLength) {
uint32_t ip_bytes;
if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
gpr_log(GPR_ERROR,
"Can't parse client IP (%s) from a hex string to an integer.",
client_ip_hex_.c_str());
return "";
}
ip_bytes = grpc_htonl(ip_bytes);
return grpc::string(reinterpret_cast<const char*>(&ip_bytes),
sizeof(ip_bytes));
} else if (client_ip_hex_.size() == kIpv6AddressLength) {
uint32_t ip_bytes[4];
for (size_t i = 0; i < 4; ++i) {
if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
ip_bytes + i) != 1) {
gpr_log(
GPR_ERROR,
"Can't parse client IP part (%s) from a hex string to an integer.",
client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
return "";
}
ip_bytes[i] = grpc_htonl(ip_bytes[i]);
}
return grpc::string(reinterpret_cast<const char*>(ip_bytes),
sizeof(ip_bytes));
} else {
GPR_UNREACHABLE_CODE(return "");
}
}
LoadRecordValue::LoadRecordValue(grpc::string metric_name, uint64_t num_calls,
double total_metric_value) {
call_metrics_.emplace(std::move(metric_name),
CallMetricValue(num_calls, total_metric_value));
}
void PerBalancerStore::MergeRow(const LoadRecordKey& key,
const LoadRecordValue& value) {
// During suspension, the load data received will be dropped.

@ -28,12 +28,11 @@
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config.h>
#include "src/cpp/server/load_reporter/constants.h"
namespace grpc {
namespace load_reporter {
constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>";
constexpr uint8_t kLbIdLen = 8;
// The load data storage is organized in hierarchy. The LoadDataStore is the
// top-level data store. In LoadDataStore, for each host we keep a
// PerHostStore, in which for each balancer we keep a PerBalancerStore. Each
@ -68,13 +67,16 @@ class CallMetricValue {
// The key of a load record.
class LoadRecordKey {
public:
explicit LoadRecordKey(grpc::string lb_id, grpc::string lb_tag,
grpc::string user_id, grpc::string client_ip_hex)
LoadRecordKey(grpc::string lb_id, grpc::string lb_tag, grpc::string user_id,
grpc::string client_ip_hex)
: lb_id_(std::move(lb_id)),
lb_tag_(std::move(lb_tag)),
user_id_(std::move(user_id)),
client_ip_hex_(std::move(client_ip_hex)) {}
// Parses the input client_ip_and_token to set client IP, LB ID, and LB tag.
LoadRecordKey(const grpc::string& client_ip_and_token, grpc::string user_id);
grpc::string ToString() const {
return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ +
", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ +
@ -86,6 +88,9 @@ class LoadRecordKey {
user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_;
}
// Gets the client IP bytes in network order (i.e., big-endian).
grpc::string GetClientIpBytes() const;
// Getters.
const grpc::string& lb_id() const { return lb_id_; }
const grpc::string& lb_tag() const { return lb_tag_; }
@ -119,8 +124,8 @@ class LoadRecordKey {
class LoadRecordValue {
public:
explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0,
uint64_t error_count = 0, double bytes_sent = 0,
double bytes_recv = 0, double latency_ms = 0)
uint64_t error_count = 0, uint64_t bytes_sent = 0,
uint64_t bytes_recv = 0, uint64_t latency_ms = 0)
: start_count_(start_count),
ok_count_(ok_count),
error_count_(error_count),
@ -128,6 +133,9 @@ class LoadRecordValue {
bytes_recv_(bytes_recv),
latency_ms_(latency_ms) {}
LoadRecordValue(grpc::string metric_name, uint64_t num_calls,
double total_metric_value);
void MergeFrom(const LoadRecordValue& other) {
start_count_ += other.start_count_;
ok_count_ += other.ok_count_;
@ -164,9 +172,9 @@ class LoadRecordValue {
uint64_t start_count() const { return start_count_; }
uint64_t ok_count() const { return ok_count_; }
uint64_t error_count() const { return error_count_; }
double bytes_sent() const { return bytes_sent_; }
double bytes_recv() const { return bytes_recv_; }
double latency_ms() const { return latency_ms_; }
uint64_t bytes_sent() const { return bytes_sent_; }
uint64_t bytes_recv() const { return bytes_recv_; }
uint64_t latency_ms() const { return latency_ms_; }
const std::unordered_map<grpc::string, CallMetricValue>& call_metrics()
const {
return call_metrics_;
@ -176,9 +184,9 @@ class LoadRecordValue {
uint64_t start_count_ = 0;
uint64_t ok_count_ = 0;
uint64_t error_count_ = 0;
double bytes_sent_ = 0;
double bytes_recv_ = 0;
double latency_ms_ = 0;
uint64_t bytes_sent_ = 0;
uint64_t bytes_recv_ = 0;
uint64_t latency_ms_ = 0;
std::unordered_map<grpc::string, CallMetricValue> call_metrics_;
};

@ -0,0 +1,498 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/impl/codegen/port_platform.h>
#include <stdint.h>
#include <stdio.h>
#include <chrono>
#include <ctime>
#include "src/cpp/server/load_reporter/constants.h"
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
#include "src/cpp/server/load_reporter/load_reporter.h"
#include "opencensus/stats/internal/set_aggregation_window.h"
namespace grpc {
namespace load_reporter {
CpuStatsProvider::CpuStatsSample CpuStatsProviderDefaultImpl::GetCpuStats() {
return GetCpuStatsImpl();
}
CensusViewProvider::CensusViewProvider()
: tag_key_token_(::opencensus::stats::TagKey::Register(kTagKeyToken)),
tag_key_host_(::opencensus::stats::TagKey::Register(kTagKeyHost)),
tag_key_user_id_(::opencensus::stats::TagKey::Register(kTagKeyUserId)),
tag_key_status_(::opencensus::stats::TagKey::Register(kTagKeyStatus)),
tag_key_metric_name_(
::opencensus::stats::TagKey::Register(kTagKeyMetricName)) {
// One view related to starting a call.
auto vd_start_count =
::opencensus::stats::ViewDescriptor()
.set_name(kViewStartCount)
.set_measure(kMeasureStartCount)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
.add_column(tag_key_user_id_)
.set_description(
"Delta count of calls started broken down by <token, host, "
"user_id>.");
::opencensus::stats::SetAggregationWindow(
::opencensus::stats::AggregationWindow::Delta(), &vd_start_count);
view_descriptor_map_.emplace(kViewStartCount, vd_start_count);
// Four views related to ending a call.
// If this view is set as Count of kMeasureEndBytesSent (in hope of saving one
// measure), it's infeasible to prepare fake data for testing. That's because
// the OpenCensus API to make up view data will add the input data as separate
// measurements instead of setting the data values directly.
auto vd_end_count =
::opencensus::stats::ViewDescriptor()
.set_name((kViewEndCount))
.set_measure((kMeasureEndCount))
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
.add_column(tag_key_user_id_)
.add_column(tag_key_status_)
.set_description(
"Delta count of calls ended broken down by <token, host, "
"user_id, status>.");
::opencensus::stats::SetAggregationWindow(
::opencensus::stats::AggregationWindow::Delta(), &vd_end_count);
view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
auto vd_end_bytes_sent =
::opencensus::stats::ViewDescriptor()
.set_name((kViewEndBytesSent))
.set_measure((kMeasureEndBytesSent))
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
.add_column(tag_key_user_id_)
.add_column(tag_key_status_)
.set_description(
"Delta sum of bytes sent broken down by <token, host, user_id, "
"status>.");
::opencensus::stats::SetAggregationWindow(
::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_sent);
view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
auto vd_end_bytes_received =
::opencensus::stats::ViewDescriptor()
.set_name((kViewEndBytesReceived))
.set_measure((kMeasureEndBytesReceived))
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
.add_column(tag_key_user_id_)
.add_column(tag_key_status_)
.set_description(
"Delta sum of bytes received broken down by <token, host, "
"user_id, status>.");
::opencensus::stats::SetAggregationWindow(
::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_received);
view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
auto vd_end_latency_ms =
::opencensus::stats::ViewDescriptor()
.set_name((kViewEndLatencyMs))
.set_measure((kMeasureEndLatencyMs))
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
.add_column(tag_key_user_id_)
.add_column(tag_key_status_)
.set_description(
"Delta sum of latency in ms broken down by <token, host, "
"user_id, status>.");
::opencensus::stats::SetAggregationWindow(
::opencensus::stats::AggregationWindow::Delta(), &vd_end_latency_ms);
view_descriptor_map_.emplace(kViewEndLatencyMs, vd_end_latency_ms);
// Two views related to other call metrics.
auto vd_metric_call_count =
::opencensus::stats::ViewDescriptor()
.set_name((kViewOtherCallMetricCount))
.set_measure((kMeasureOtherCallMetric))
.set_aggregation(::opencensus::stats::Aggregation::Count())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
.add_column(tag_key_user_id_)
.add_column(tag_key_metric_name_)
.set_description(
"Delta count of calls broken down by <token, host, user_id, "
"metric_name>.");
::opencensus::stats::SetAggregationWindow(
::opencensus::stats::AggregationWindow::Delta(), &vd_metric_call_count);
view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
auto vd_metric_value =
::opencensus::stats::ViewDescriptor()
.set_name((kViewOtherCallMetricValue))
.set_measure((kMeasureOtherCallMetric))
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
.add_column(tag_key_user_id_)
.add_column(tag_key_metric_name_)
.set_description(
"Delta sum of call metric value broken down "
"by <token, host, user_id, metric_name>.");
::opencensus::stats::SetAggregationWindow(
::opencensus::stats::AggregationWindow::Delta(), &vd_metric_value);
view_descriptor_map_.emplace(kViewOtherCallMetricValue, vd_metric_value);
}
double CensusViewProvider::GetRelatedViewDataRowDouble(
const ViewDataMap& view_data_map, const char* view_name,
size_t view_name_len, const std::vector<grpc::string>& tag_values) {
auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
GPR_ASSERT(it_vd != view_data_map.end());
auto it_row = it_vd->second.double_data().find(tag_values);
GPR_ASSERT(it_row != it_vd->second.double_data().end());
return it_row->second;
}
CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
for (const auto& p : view_descriptor_map()) {
const grpc::string& view_name = p.first;
const ::opencensus::stats::ViewDescriptor& vd = p.second;
// We need to use pair's piecewise ctor here, otherwise the deleted copy
// ctor of View will be called.
view_map_.emplace(std::piecewise_construct,
std::forward_as_tuple(view_name),
std::forward_as_tuple(vd));
}
}
CensusViewProvider::ViewDataMap CensusViewProviderDefaultImpl::FetchViewData() {
gpr_log(GPR_DEBUG, "[CVP %p] Starts fetching Census view data.", this);
ViewDataMap view_data_map;
for (auto& p : view_map_) {
const grpc::string& view_name = p.first;
::opencensus::stats::View& view = p.second;
if (view.IsValid()) {
view_data_map.emplace(view_name, view.GetData());
gpr_log(GPR_DEBUG, "[CVP %p] Fetched view data (view: %s).", this,
view_name.c_str());
} else {
gpr_log(
GPR_DEBUG,
"[CVP %p] Can't fetch view data because view is invalid (view: %s).",
this, view_name.c_str());
}
}
return view_data_map;
}
grpc::string LoadReporter::GenerateLbId() {
while (true) {
if (next_lb_id_ > UINT32_MAX) {
gpr_log(GPR_ERROR, "[LR %p] The LB ID exceeds the max valid value!",
this);
return "";
}
int64_t lb_id = next_lb_id_++;
// Overflow should never happen.
GPR_ASSERT(lb_id >= 0);
// Convert to padded hex string for a 32-bit LB ID. E.g, "0000ca5b".
char buf[kLbIdLength + 1];
snprintf(buf, sizeof(buf), "%08lx", lb_id);
grpc::string lb_id_str(buf, kLbIdLength);
// The client may send requests with LB ID that has never been allocated
// by this load reporter. Those IDs are tracked and will be skipped when
// we generate a new ID.
if (!load_data_store_.IsTrackedUnknownBalancerId(lb_id_str)) {
return lb_id_str;
}
}
}
::grpc::lb::v1::LoadBalancingFeedback
LoadReporter::GenerateLoadBalancingFeedback() {
std::unique_lock<std::mutex> lock(feedback_mu_);
auto now = std::chrono::system_clock::now();
// Discard records outside the window until there is only one record
// outside the window, which is used as the base for difference.
while (feedback_records_.size() > 1 &&
!IsRecordInWindow(feedback_records_[1], now)) {
feedback_records_.pop_front();
}
if (feedback_records_.size() < 2) {
return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
}
// Find the longest range with valid ends.
LoadBalancingFeedbackRecord* oldest = &feedback_records_[0];
LoadBalancingFeedbackRecord* newest =
&feedback_records_[feedback_records_.size() - 1];
while (newest > oldest &&
(newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
// A zero limit means that the system info reading was failed, so these
// records can't be used to calculate CPU utilization.
if (newest->cpu_limit == 0) --newest;
if (oldest->cpu_limit == 0) ++oldest;
}
if (newest - oldest < 1 || oldest->end_time == newest->end_time ||
newest->cpu_limit == oldest->cpu_limit) {
return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
}
uint64_t rpcs = 0;
uint64_t errors = 0;
for (LoadBalancingFeedbackRecord* p = newest; p != oldest; --p) {
// Because these two numbers are counters, the oldest record shouldn't be
// included.
rpcs += p->rpcs;
errors += p->errors;
}
double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
std::chrono::duration<double> duration_seconds =
newest->end_time - oldest->end_time;
lock.unlock();
::grpc::lb::v1::LoadBalancingFeedback feedback;
feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
feedback.set_calls_per_second(
static_cast<float>(rpcs / duration_seconds.count()));
feedback.set_errors_per_second(
static_cast<float>(errors / duration_seconds.count()));
return feedback;
}
::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load>
LoadReporter::GenerateLoads(const grpc::string& hostname,
const grpc::string& lb_id) {
std::lock_guard<std::mutex> lock(store_mu_);
auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
GPR_ASSERT(assigned_stores != nullptr);
GPR_ASSERT(!assigned_stores->empty());
::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> loads;
for (PerBalancerStore* per_balancer_store : *assigned_stores) {
GPR_ASSERT(!per_balancer_store->IsSuspended());
if (!per_balancer_store->load_record_map().empty()) {
for (const auto& p : per_balancer_store->load_record_map()) {
const auto& key = p.first;
const auto& value = p.second;
auto load = loads.Add();
load->set_load_balance_tag(key.lb_tag());
load->set_user_id(key.user_id());
load->set_client_ip_address(key.GetClientIpBytes());
load->set_num_calls_started(static_cast<int64_t>(value.start_count()));
load->set_num_calls_finished_without_error(
static_cast<int64_t>(value.ok_count()));
load->set_num_calls_finished_with_error(
static_cast<int64_t>(value.error_count()));
load->set_total_bytes_sent(static_cast<int64_t>(value.bytes_sent()));
load->set_total_bytes_received(
static_cast<int64_t>(value.bytes_recv()));
load->mutable_total_latency()->set_seconds(
static_cast<int64_t>(value.latency_ms() / 1000));
load->mutable_total_latency()->set_nanos(
(static_cast<int32_t>(value.latency_ms()) % 1000) * 1000000);
for (const auto& p : value.call_metrics()) {
const grpc::string& metric_name = p.first;
const CallMetricValue& metric_value = p.second;
auto call_metric_data = load->add_metric_data();
call_metric_data->set_metric_name(metric_name);
call_metric_data->set_num_calls_finished_with_metric(
metric_value.num_calls());
call_metric_data->set_total_metric_value(
metric_value.total_metric_value());
}
if (per_balancer_store->lb_id() != lb_id) {
// This per-balancer store is an orphan assigned to this receiving
// balancer.
AttachOrphanLoadId(load, *per_balancer_store);
}
}
per_balancer_store->ClearLoadRecordMap();
}
if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
auto load = loads.Add();
load->set_num_calls_in_progress(
per_balancer_store->GetNumCallsInProgressForReport());
if (per_balancer_store->lb_id() != lb_id) {
// This per-balancer store is an orphan assigned to this receiving
// balancer.
AttachOrphanLoadId(load, *per_balancer_store);
}
}
}
return loads;
}
void LoadReporter::AttachOrphanLoadId(
::grpc::lb::v1::Load* load, const PerBalancerStore& per_balancer_store) {
if (per_balancer_store.lb_id() == kInvalidLbId) {
load->set_load_key_unknown(true);
} else {
load->set_load_key_unknown(false);
load->mutable_orphaned_load_identifier()->set_load_key(
per_balancer_store.load_key());
load->mutable_orphaned_load_identifier()->set_load_balancer_id(
per_balancer_store.lb_id());
}
}
void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
CpuStatsProvider::CpuStatsSample cpu_stats;
if (cpu_stats_provider_ != nullptr) {
cpu_stats = cpu_stats_provider_->GetCpuStats();
} else {
// This will make the load balancing feedback generation a no-op.
cpu_stats = {0, 0};
}
std::unique_lock<std::mutex> lock(feedback_mu_);
feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
cpu_stats.first, cpu_stats.second);
}
void LoadReporter::ReportStreamCreated(const grpc::string& hostname,
const grpc::string& lb_id,
const grpc::string& load_key) {
std::lock_guard<std::mutex> lock(store_mu_);
load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
gpr_log(GPR_INFO,
"[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
this, hostname.c_str(), lb_id.c_str(), load_key.c_str());
}
void LoadReporter::ReportStreamClosed(const grpc::string& hostname,
const grpc::string& lb_id) {
std::lock_guard<std::mutex> lock(store_mu_);
load_data_store_.ReportStreamClosed(hostname, lb_id);
gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this,
hostname.c_str(), lb_id.c_str());
}
void LoadReporter::ProcessViewDataCallStart(
const CensusViewProvider::ViewDataMap& view_data_map) {
auto it = view_data_map.find(kViewStartCount);
if (it != view_data_map.end()) {
// Note that the data type for any Sum view is double, whatever the data
// type of the original measure.
for (const auto& p : it->second.double_data()) {
const std::vector<grpc::string>& tag_values = p.first;
const uint64_t start_count = static_cast<uint64_t>(p.second);
const grpc::string& client_ip_and_token = tag_values[0];
const grpc::string& host = tag_values[1];
const grpc::string& user_id = tag_values[2];
LoadRecordKey key(client_ip_and_token, user_id);
LoadRecordValue value = LoadRecordValue(start_count);
{
std::unique_lock<std::mutex> lock(store_mu_);
load_data_store_.MergeRow(host, key, value);
}
}
}
}
void LoadReporter::ProcessViewDataCallEnd(
const CensusViewProvider::ViewDataMap& view_data_map) {
uint64_t total_end_count = 0;
uint64_t total_error_count = 0;
auto it = view_data_map.find(kViewEndCount);
if (it != view_data_map.end()) {
// Note that the data type for any Sum view is double, whatever the data
// type of the original measure.
for (const auto& p : it->second.double_data()) {
const std::vector<grpc::string>& tag_values = p.first;
const uint64_t end_count = static_cast<uint64_t>(p.second);
const grpc::string& client_ip_and_token = tag_values[0];
const grpc::string& host = tag_values[1];
const grpc::string& user_id = tag_values[2];
const grpc::string& status = tag_values[3];
// This is due to a bug reported internally of Java server load reporting
// implementation.
// TODO(juanlishen): Check whether this situation happens in OSS C++.
if (client_ip_and_token.size() == 0) {
gpr_log(GPR_DEBUG,
"Skipping processing Opencensus record with empty "
"client_ip_and_token tag.");
continue;
}
LoadRecordKey key(client_ip_and_token, user_id);
const uint64_t bytes_sent =
CensusViewProvider::GetRelatedViewDataRowDouble(
view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
tag_values);
const uint64_t bytes_received =
CensusViewProvider::GetRelatedViewDataRowDouble(
view_data_map, kViewEndBytesReceived,
sizeof(kViewEndBytesReceived) - 1, tag_values);
const uint64_t latency_ms =
CensusViewProvider::GetRelatedViewDataRowDouble(
view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
tag_values);
uint64_t ok_count = 0;
uint64_t error_count = 0;
total_end_count += end_count;
if (std::strcmp(status.c_str(), kCallStatusOk) == 0) {
ok_count = end_count;
} else {
error_count = end_count;
total_error_count += end_count;
}
LoadRecordValue value = LoadRecordValue(
0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
{
std::unique_lock<std::mutex> lock(store_mu_);
load_data_store_.MergeRow(host, key, value);
}
}
}
AppendNewFeedbackRecord(total_end_count, total_error_count);
}
void LoadReporter::ProcessViewDataOtherCallMetrics(
const CensusViewProvider::ViewDataMap& view_data_map) {
auto it = view_data_map.find(kViewOtherCallMetricCount);
if (it != view_data_map.end()) {
for (const auto& p : it->second.int_data()) {
const std::vector<grpc::string>& tag_values = p.first;
const int64_t num_calls = p.second;
const grpc::string& client_ip_and_token = tag_values[0];
const grpc::string& host = tag_values[1];
const grpc::string& user_id = tag_values[2];
const grpc::string& metric_name = tag_values[3];
LoadRecordKey key(client_ip_and_token, user_id);
const double total_metric_value =
CensusViewProvider::GetRelatedViewDataRowDouble(
view_data_map, kViewOtherCallMetricValue,
sizeof(kViewOtherCallMetricValue) - 1, tag_values);
LoadRecordValue value = LoadRecordValue(
metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
{
std::unique_lock<std::mutex> lock(store_mu_);
load_data_store_.MergeRow(host, key, value);
}
}
}
}
void LoadReporter::FetchAndSample() {
gpr_log(GPR_DEBUG,
"[LR %p] Starts fetching Census view data and sampling LB feedback "
"record.",
this);
CensusViewProvider::ViewDataMap view_data_map =
census_view_provider_->FetchViewData();
ProcessViewDataCallStart(view_data_map);
ProcessViewDataCallEnd(view_data_map);
ProcessViewDataOtherCallMetrics(view_data_map);
}
} // namespace load_reporter
} // namespace grpc

@ -0,0 +1,225 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
#include <grpc/support/port_platform.h>
#include <atomic>
#include <chrono>
#include <deque>
#include <vector>
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config.h>
#include "src/cpp/server/load_reporter/load_data_store.h"
#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
#include "opencensus/stats/stats.h"
namespace grpc {
namespace load_reporter {
// The interface to get the Census stats. Abstracted for mocking.
class CensusViewProvider {
public:
// Maps from the view name to the view data.
using ViewDataMap =
std::unordered_map<grpc::string, ::opencensus::stats::ViewData>;
// Maps from the view name to the view descriptor.
using ViewDescriptorMap =
std::unordered_map<grpc::string, ::opencensus::stats::ViewDescriptor>;
CensusViewProvider();
virtual ~CensusViewProvider() = default;
// Fetches the view data accumulated since last fetching, and returns it as a
// map from the view name to the view data.
virtual ViewDataMap FetchViewData() = 0;
// A helper function that gets a row with the input tag values from the view
// data. Only used when we know that row must exist because we have seen a row
// with the same tag values in a related view data. Several ViewData's are
// considered related if their views are based on the measures that are always
// recorded at the same time.
double static GetRelatedViewDataRowDouble(
const ViewDataMap& view_data_map, const char* view_name,
size_t view_name_len, const std::vector<grpc::string>& tag_values);
protected:
const ViewDescriptorMap& view_descriptor_map() const {
return view_descriptor_map_;
}
private:
ViewDescriptorMap view_descriptor_map_;
// Tag keys.
::opencensus::stats::TagKey tag_key_token_;
::opencensus::stats::TagKey tag_key_host_;
::opencensus::stats::TagKey tag_key_user_id_;
::opencensus::stats::TagKey tag_key_status_;
::opencensus::stats::TagKey tag_key_metric_name_;
};
// The default implementation fetches the real stats from Census.
class CensusViewProviderDefaultImpl : public CensusViewProvider {
public:
CensusViewProviderDefaultImpl();
ViewDataMap FetchViewData() override;
private:
std::unordered_map<grpc::string, ::opencensus::stats::View> view_map_;
};
// The interface to get the CPU stats. Abstracted for mocking.
class CpuStatsProvider {
public:
// The used and total amounts of CPU usage.
using CpuStatsSample = std::pair<uint64_t, uint64_t>;
virtual ~CpuStatsProvider() = default;
// Gets the cumulative used CPU and total CPU resource.
virtual CpuStatsSample GetCpuStats() = 0;
};
// The default implementation reads CPU jiffies from the system to calculate CPU
// utilization.
class CpuStatsProviderDefaultImpl : public CpuStatsProvider {
public:
CpuStatsSample GetCpuStats() override;
};
// Maintains all the load data and load reporting streams.
class LoadReporter {
public:
// TODO(juanlishen): Allow config for providers from users.
LoadReporter(uint32_t feedback_sample_window_seconds,
std::unique_ptr<CensusViewProvider> census_view_provider,
std::unique_ptr<CpuStatsProvider> cpu_stats_provider)
: feedback_sample_window_seconds_(feedback_sample_window_seconds),
census_view_provider_(std::move(census_view_provider)),
cpu_stats_provider_(std::move(cpu_stats_provider)) {
// Append the initial record so that the next real record can have a base.
AppendNewFeedbackRecord(0, 0);
}
// Fetches the latest data from Census and merge it into the data store.
// Also adds a new sample to the LB feedback sliding window.
// Thread-unsafe. (1). The access to the load data store and feedback records
// has locking. (2). The access to the Census view provider and CPU stats
// provider lacks locking, but we only access these two members in this method
// (in testing, we also access them when setting up expectation). So the
// invocations of this method must be serialized.
void FetchAndSample();
// Generates a report for that host and balancer. The report contains
// all the stats data accumulated between the last report (i.e., the last
// consumption) and the last fetch from Census (i.e., the last production).
// Thread-safe.
::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads(
const grpc::string& hostname, const grpc::string& lb_id);
// The feedback is calculated from the stats data recorded in the sliding
// window. Outdated records are discarded.
// Thread-safe.
::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback();
// Wrapper around LoadDataStore::ReportStreamCreated.
// Thread-safe.
void ReportStreamCreated(const grpc::string& hostname,
const grpc::string& lb_id,
const grpc::string& load_key);
// Wrapper around LoadDataStore::ReportStreamClosed.
// Thread-safe.
void ReportStreamClosed(const grpc::string& hostname,
const grpc::string& lb_id);
// Generates a unique LB ID of length kLbIdLength. Returns an empty string
// upon failure. Thread-safe.
grpc::string GenerateLbId();
// Accessors only for testing.
CensusViewProvider* census_view_provider() {
return census_view_provider_.get();
}
CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); }
private:
struct LoadBalancingFeedbackRecord {
std::chrono::system_clock::time_point end_time;
uint64_t rpcs;
uint64_t errors;
uint64_t cpu_usage;
uint64_t cpu_limit;
LoadBalancingFeedbackRecord(
const std::chrono::system_clock::time_point& end_time, uint64_t rpcs,
uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit)
: end_time(end_time),
rpcs(rpcs),
errors(errors),
cpu_usage(cpu_usage),
cpu_limit(cpu_limit) {}
};
// Finds the view data about starting call from the view_data_map and merges
// the data to the load data store.
void ProcessViewDataCallStart(
const CensusViewProvider::ViewDataMap& view_data_map);
// Finds the view data about ending call from the view_data_map and merges the
// data to the load data store.
void ProcessViewDataCallEnd(
const CensusViewProvider::ViewDataMap& view_data_map);
// Finds the view data about the customized call metrics from the
// view_data_map and merges the data to the load data store.
void ProcessViewDataOtherCallMetrics(
const CensusViewProvider::ViewDataMap& view_data_map);
bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record,
std::chrono::system_clock::time_point now) {
return record.end_time > now - feedback_sample_window_seconds_;
}
void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors);
// Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches
// it to the load.
void AttachOrphanLoadId(::grpc::lb::v1::Load* load,
const PerBalancerStore& per_balancer_store);
std::atomic<int64_t> next_lb_id_{0};
const std::chrono::seconds feedback_sample_window_seconds_;
std::mutex feedback_mu_;
std::deque<LoadBalancingFeedbackRecord> feedback_records_;
// TODO(juanlishen): Lock in finer grain. Locking the whole store may be
// too expensive.
std::mutex store_mu_;
LoadDataStore load_data_store_;
std::unique_ptr<CensusViewProvider> census_view_provider_;
std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;
};
} // namespace load_reporter
} // namespace grpc
#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H

@ -14,11 +14,21 @@
licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
grpc_package(name = "lb", visibility = "public")
grpc_package(
name = "lb",
visibility = "public",
)
grpc_proto_library(
name = "load_balancer_proto",
srcs = ["load_balancer.proto"],
)
grpc_proto_library(
name = "load_reporter_proto",
srcs = ["load_reporter.proto"],
has_services = True,
well_known_protos = True,
)

@ -0,0 +1,180 @@
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package grpc.lb.v1;
import "google/protobuf/duration.proto";
// The LoadReporter service.
service LoadReporter {
// Report load from server to lb.
rpc ReportLoad(stream LoadReportRequest)
returns (stream LoadReportResponse) {
};
}
message LoadReportRequest {
// This message should be sent on the first request to the gRPC server.
InitialLoadReportRequest initial_request = 1;
}
message InitialLoadReportRequest {
// The hostname this load reporter client is requesting load for.
string load_balanced_hostname = 1;
// Additional information to disambiguate orphaned load: load that should have
// gone to this load reporter client, but was not able to be sent since the
// load reporter client has disconnected. load_key is sent in orphaned load
// reports; see Load.load_key.
bytes load_key = 2;
// This interval defines how often the server should send load reports to
// the load balancer.
google.protobuf.Duration load_report_interval = 3;
}
message LoadReportResponse {
// This message should be sent on the first response to the load balancer.
InitialLoadReportResponse initial_response = 1;
// Reports server-wide statistics for load balancing.
// This should be reported with every response.
LoadBalancingFeedback load_balancing_feedback = 2;
// A load report for each <tag, user_id> tuple. This could be considered to be
// a multimap indexed by <tag, user_id>. It is not strictly necessary to
// aggregate all entries into one entry per <tag, user_id> tuple, although it
// is preferred to do so.
repeated Load load = 3;
}
message InitialLoadReportResponse {
// Initial response returns the Load balancer ID. This must be plain text
// (printable ASCII).
string load_balancer_id = 1;
enum ImplementationIdentifier {
IMPL_UNSPECIFIED = 0;
CPP = 1; // Standard Google C++ implementation.
JAVA = 2; // Standard Google Java implementation.
GO = 3; // Standard Google Go implementation.
}
// Optional identifier of this implementation of the load reporting server.
ImplementationIdentifier implementation_id = 2;
// Optional server_version should be a value that is modified (and
// monotonically increased) when changes are made to the server
// implementation.
int64 server_version = 3;
}
message LoadBalancingFeedback {
// Reports the current utilization of the server (typical range [0.0 - 1.0]).
float server_utilization = 1;
// The total rate of calls handled by this server (including errors).
float calls_per_second = 2;
// The total rate of error responses sent by this server.
float errors_per_second = 3;
}
message Load {
// The (plain text) tag used by the calls covered by this load report. The
// tag is that part of the load balancer token after removing the load
// balancer id. Empty is equivalent to non-existent tag.
string load_balance_tag = 1;
// The user identity authenticated by the calls covered by this load
// report. Empty is equivalent to no known user_id.
string user_id = 3;
// IP address of the client that sent these requests, serialized in
// network-byte-order. It may either be an IPv4 or IPv6 address.
bytes client_ip_address = 15;
// The number of calls started (since the last report) with the given tag and
// user_id.
int64 num_calls_started = 4;
// Indicates whether this load report is an in-progress load report in which
// num_calls_in_progress is the only valid entry. If in_progress_report is not
// set, num_calls_in_progress will be ignored. If in_progress_report is set,
// fields other than num_calls_in_progress and orphaned_load will be ignored.
oneof in_progress_report {
// The number of calls in progress (instantaneously) per load balancer id.
int64 num_calls_in_progress = 5;
}
// The following values are counts or totals of call statistics that finished
// with the given tag and user_id.
int64 num_calls_finished_without_error = 6; // Calls with status OK.
int64 num_calls_finished_with_error = 7; // Calls with status non-OK.
// Calls that finished with a status that maps to HTTP 5XX (see
// googleapis/google/rpc/code.proto). Note that this is a subset of
// num_calls_finished_with_error.
int64 num_calls_finished_with_server_error = 16;
// Totals are from calls that with _and_ without error.
int64 total_bytes_sent = 8;
int64 total_bytes_received = 9;
google.protobuf.Duration total_latency = 10;
// Optional metrics reported for the call(s). Requires that metric_name is
// unique.
repeated CallMetricData metric_data = 11;
// The following two fields are used for reporting orphaned load: load that
// could not be reported to the originating balancer either since the balancer
// is no longer connected or because the frontend sent an invalid token. These
// fields must not be set with normal (unorphaned) load reports.
oneof orphaned_load {
// Load_key is the load_key from the initial_request from the originating
// balancer.
bytes load_key = 12 [deprecated=true];
// If true then this load report is for calls that had an invalid token; the
// user is probably abusing the gRPC protocol.
// TODO(yankaiz): Rename load_key_unknown.
bool load_key_unknown = 13;
// load_key and balancer_id are included in order to identify orphaned load
// from different origins.
OrphanedLoadIdentifier orphaned_load_identifier = 14;
}
reserved 2;
}
message CallMetricData {
// Name of the metric; may be empty.
string metric_name = 1;
// Number of calls that finished and included this metric.
int64 num_calls_finished_with_metric = 2;
// Sum of metric values across all calls that finished with this metric.
double total_metric_value = 3;
}
message OrphanedLoadIdentifier {
// The load_key from the initial_request from the originating balancer.
bytes load_key = 1;
// The unique ID generated by LoadReporter to identify balancers. Here it
// distinguishes orphaned load with a same load_key.
string load_balancer_id = 2;
}

@ -14,7 +14,7 @@
licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_cc_library", "grpc_cc_binary", "grpc_package")
load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package")
grpc_package(name = "test/cpp/server/load_reporter")
@ -29,3 +29,35 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "lb_load_reporter_test",
srcs = ["load_reporter_test.cc"],
external_deps = [
"gtest",
"gmock",
"opencensus-stats-test",
],
deps = [
"//:gpr",
"//:grpc",
"//:lb_load_reporter",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "lb_get_cpu_stats_test",
srcs = ["get_cpu_stats_test.cc"],
external_deps = [
"gtest",
],
deps = [
"//:gpr",
"//:grpc",
"//:lb_get_cpu_stats",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
)

@ -0,0 +1,61 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/impl/codegen/port_platform.h>
#include <grpc/grpc.h>
#include <gtest/gtest.h>
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
namespace grpc {
namespace testing {
namespace {
TEST(GetCpuStatsTest, ReadOnce) { ::grpc::load_reporter::GetCpuStatsImpl(); }
TEST(GetCpuStatsTest, BusyNoLargerThanTotal) {
auto p = ::grpc::load_reporter::GetCpuStatsImpl();
uint64_t busy = p.first;
uint64_t total = p.second;
ASSERT_LE(busy, total);
}
TEST(GetCpuStatsTest, Ascending) {
const size_t kRuns = 100;
auto prev = ::grpc::load_reporter::GetCpuStatsImpl();
for (size_t i = 0; i < kRuns; ++i) {
auto cur = ::grpc::load_reporter::GetCpuStatsImpl();
ASSERT_LE(prev.first, cur.first);
ASSERT_LE(prev.second, cur.second);
prev = cur;
}
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -393,9 +393,9 @@ TEST_F(PerBalancerStoreTest, Suspend) {
TEST_F(PerBalancerStoreTest, DataAggregation) {
PerBalancerStore per_balancer_store(kLbId1, kLoadKey1);
// Construct some Values.
LoadRecordValue v1(992, 34, 13, 234.0, 164.0, 173467.38);
LoadRecordValue v1(992, 34, 13, 234, 164, 173467);
v1.InsertCallMetric(kMetric1, CallMetricValue(3, 2773.2));
LoadRecordValue v2(4842, 213, 9, 393.0, 974.0, 1345.2398);
LoadRecordValue v2(4842, 213, 9, 393, 974, 1345);
v2.InsertCallMetric(kMetric1, CallMetricValue(7, 25.234));
v2.InsertCallMetric(kMetric2, CallMetricValue(2, 387.08));
// v3 doesn't change the number of in-progress RPCs.

@ -0,0 +1,498 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/impl/codegen/port_platform.h>
#include <set>
#include <vector>
#include <gmock/gmock.h>
#include <grpc/grpc.h>
#include <gtest/gtest.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/cpp/server/load_reporter/constants.h"
#include "src/cpp/server/load_reporter/load_reporter.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "opencensus/stats/testing/test_utils.h"
namespace grpc {
namespace testing {
namespace {
using ::grpc::lb::v1::LoadBalancingFeedback;
using ::grpc::load_reporter::CensusViewProvider;
using ::grpc::load_reporter::CpuStatsProvider;
using ::grpc::load_reporter::LoadReporter;
using ::opencensus::stats::View;
using ::opencensus::stats::ViewData;
using ::opencensus::stats::ViewDataImpl;
using ::opencensus::stats::ViewDescriptor;
using ::testing::DoubleNear;
using ::testing::Return;
constexpr uint64_t kFeedbackSampleWindowSeconds = 5;
constexpr uint64_t kFetchAndSampleIntervalSeconds = 1;
constexpr uint64_t kNumFeedbackSamplesInWindow =
kFeedbackSampleWindowSeconds / kFetchAndSampleIntervalSeconds;
class MockCensusViewProvider : public CensusViewProvider {
public:
MOCK_METHOD0(FetchViewData, CensusViewProvider::ViewDataMap());
const ::opencensus::stats::ViewDescriptor& FindViewDescriptor(
const grpc::string& view_name) {
auto it = view_descriptor_map().find(view_name);
GPR_ASSERT(it != view_descriptor_map().end());
return it->second;
}
};
class MockCpuStatsProvider : public CpuStatsProvider {
public:
MOCK_METHOD0(GetCpuStats, CpuStatsProvider::CpuStatsSample());
};
class LoadReporterTest : public ::testing::Test {
public:
LoadReporterTest() {}
MockCensusViewProvider* mock_census_view_provider() {
return static_cast<MockCensusViewProvider*>(
load_reporter_->census_view_provider());
}
void PrepareCpuExpectation(size_t call_num) {
auto mock_cpu_stats_provider = static_cast<MockCpuStatsProvider*>(
load_reporter_->cpu_stats_provider());
::testing::InSequence s;
for (size_t i = 0; i < call_num; ++i) {
EXPECT_CALL(*mock_cpu_stats_provider, GetCpuStats())
.WillOnce(Return(kCpuStatsSamples[i]))
.RetiresOnSaturation();
}
}
CpuStatsProvider::CpuStatsSample initial_cpu_stats_{2, 20};
const std::vector<CpuStatsProvider::CpuStatsSample> kCpuStatsSamples = {
{13, 53}, {64, 96}, {245, 345}, {314, 785},
{874, 1230}, {1236, 2145}, {1864, 2974}};
std::unique_ptr<LoadReporter> load_reporter_;
const grpc::string kHostname1 = "kHostname1";
const grpc::string kHostname2 = "kHostname2";
const grpc::string kHostname3 = "kHostname3";
// Pad to the length of a valid LB ID.
const grpc::string kLbId1 = "kLbId111";
const grpc::string kLbId2 = "kLbId222";
const grpc::string kLbId3 = "kLbId333";
const grpc::string kLbId4 = "kLbId444";
const grpc::string kLoadKey1 = "kLoadKey1";
const grpc::string kLoadKey2 = "kLoadKey2";
const grpc::string kLoadKey3 = "kLoadKey3";
const grpc::string kLbTag1 = "kLbTag1";
const grpc::string kLbTag2 = "kLbTag2";
const grpc::string kLbToken1 = "kLbId111kLbTag1";
const grpc::string kLbToken2 = "kLbId222kLbTag2";
const grpc::string kUser1 = "kUser1";
const grpc::string kUser2 = "kUser2";
const grpc::string kUser3 = "kUser3";
const grpc::string kClientIp0 = "00";
const grpc::string kClientIp1 = "0800000001";
const grpc::string kClientIp2 = "3200000000000000000000000000000002";
const grpc::string kMetric1 = "kMetric1";
const grpc::string kMetric2 = "kMetric2";
private:
void SetUp() override {
auto mock_cpu = new MockCpuStatsProvider();
auto mock_census = new MockCensusViewProvider();
// Prepare the initial CPU stats data. Note that the expectation should be
// set up before the load reporter is initialized, because CPU stats is
// sampled at that point.
EXPECT_CALL(*mock_cpu, GetCpuStats())
.WillOnce(Return(initial_cpu_stats_))
.RetiresOnSaturation();
load_reporter_ = std::unique_ptr<LoadReporter>(
new LoadReporter(kFeedbackSampleWindowSeconds,
std::unique_ptr<CensusViewProvider>(mock_census),
std::unique_ptr<CpuStatsProvider>(mock_cpu)));
}
};
class LbFeedbackTest : public LoadReporterTest {
public:
// Note that [start, start + count) of the fake samples (maybe plus the
// initial record) are in the window now.
void VerifyLbFeedback(const LoadBalancingFeedback& lb_feedback, size_t start,
size_t count) {
const CpuStatsProvider::CpuStatsSample* base =
start == 0 ? &initial_cpu_stats_ : &kCpuStatsSamples[start - 1];
double expected_cpu_util =
static_cast<double>(kCpuStatsSamples[start + count - 1].first -
base->first) /
static_cast<double>(kCpuStatsSamples[start + count - 1].second -
base->second);
ASSERT_THAT(static_cast<double>(lb_feedback.server_utilization()),
DoubleNear(expected_cpu_util, 0.00001));
double qps_sum = 0, eps_sum = 0;
for (size_t i = 0; i < count; ++i) {
qps_sum += kQpsEpsSamples[start + i].first;
eps_sum += kQpsEpsSamples[start + i].second;
}
double expected_qps = qps_sum / count;
double expected_eps = eps_sum / count;
// TODO(juanlishen): The error is big because we use sleep(). It should be
// much smaller when we use fake clock.
ASSERT_THAT(static_cast<double>(lb_feedback.calls_per_second()),
DoubleNear(expected_qps, expected_qps / 50));
ASSERT_THAT(static_cast<double>(lb_feedback.errors_per_second()),
DoubleNear(expected_eps, expected_eps / 50));
gpr_log(GPR_INFO,
"Verified LB feedback matches the samples of index [%lu, %lu).",
start, start + count);
}
const std::vector<std::pair<double, double>> kQpsEpsSamples = {
{546.1, 153.1}, {62.1, 54.1}, {578.1, 154.2}, {978.1, 645.1},
{1132.1, 846.4}, {531.5, 315.4}, {874.1, 324.9}};
};
TEST_F(LbFeedbackTest, ZeroDuration) {
PrepareCpuExpectation(kCpuStatsSamples.size());
EXPECT_CALL(*mock_census_view_provider(), FetchViewData())
.WillRepeatedly(
Return(::grpc::load_reporter::CensusViewProvider::ViewDataMap()));
// Verify that divide-by-zero exception doesn't happen.
for (size_t i = 0; i < kCpuStatsSamples.size(); ++i) {
load_reporter_->FetchAndSample();
}
load_reporter_->GenerateLoadBalancingFeedback();
}
TEST_F(LbFeedbackTest, Normal) {
// Prepare view data list using the <QPS, EPS> samples.
std::vector<CensusViewProvider::ViewDataMap> view_data_map_list;
for (const auto& p : LbFeedbackTest::kQpsEpsSamples) {
double qps = p.first;
double eps = p.second;
double ok_count = (qps - eps) * kFetchAndSampleIntervalSeconds;
double error_count = eps * kFetchAndSampleIntervalSeconds;
double ok_count_1 = ok_count / 3.0;
double ok_count_2 = ok_count - ok_count_1;
auto end_count_vd = ::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndCount),
{{{kClientIp0 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
ok_count_1},
{{kClientIp0 + kLbToken1, kHostname1, kUser2,
::grpc::load_reporter::kCallStatusOk},
ok_count_2},
{{kClientIp0 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusClientError},
error_count}});
// Values for other view data don't matter.
auto end_bytes_sent_vd =
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndBytesSent),
{{{kClientIp0 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
0},
{{kClientIp0 + kLbToken1, kHostname1, kUser2,
::grpc::load_reporter::kCallStatusOk},
0},
{{kClientIp0 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusClientError},
0}});
auto end_bytes_received_vd =
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndBytesReceived),
{{{kClientIp0 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
0},
{{kClientIp0 + kLbToken1, kHostname1, kUser2,
::grpc::load_reporter::kCallStatusOk},
0},
{{kClientIp0 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusClientError},
0}});
auto end_latency_vd = ::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndLatencyMs),
{{{kClientIp0 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
0},
{{kClientIp0 + kLbToken1, kHostname1, kUser2,
::grpc::load_reporter::kCallStatusOk},
0},
{{kClientIp0 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusClientError},
0}});
view_data_map_list.push_back(
{{::grpc::load_reporter::kViewEndCount, end_count_vd},
{::grpc::load_reporter::kViewEndBytesSent, end_bytes_sent_vd},
{::grpc::load_reporter::kViewEndBytesReceived, end_bytes_received_vd},
{::grpc::load_reporter::kViewEndLatencyMs, end_latency_vd}});
}
{
::testing::InSequence s;
for (size_t i = 0; i < view_data_map_list.size(); ++i) {
EXPECT_CALL(*mock_census_view_provider(), FetchViewData())
.WillOnce(Return(view_data_map_list[i]))
.RetiresOnSaturation();
}
}
PrepareCpuExpectation(kNumFeedbackSamplesInWindow + 2);
// When the load reporter is created, a trivial LB feedback record is added.
// But that's not enough for generating an LB feedback.
// Fetch some view data so that non-trivial LB feedback can be generated.
for (size_t i = 0; i < kNumFeedbackSamplesInWindow / 2; ++i) {
// TODO(juanlishen): Find some fake clock to speed up testing.
sleep(1);
load_reporter_->FetchAndSample();
}
VerifyLbFeedback(load_reporter_->GenerateLoadBalancingFeedback(), 0,
kNumFeedbackSamplesInWindow / 2);
// Fetch more view data so that the feedback record window is just full (the
// initial record just falls out of the window).
for (size_t i = 0; i < (kNumFeedbackSamplesInWindow + 1) / 2; ++i) {
sleep(1);
load_reporter_->FetchAndSample();
}
VerifyLbFeedback(load_reporter_->GenerateLoadBalancingFeedback(), 0,
kNumFeedbackSamplesInWindow);
// Further fetching will cause the old records to fall out of the window.
for (size_t i = 0; i < 2; ++i) {
sleep(1);
load_reporter_->FetchAndSample();
}
VerifyLbFeedback(load_reporter_->GenerateLoadBalancingFeedback(), 2,
kNumFeedbackSamplesInWindow);
}
using LoadReportTest = LoadReporterTest;
TEST_F(LoadReportTest, BasicReport) {
// Make up the first view data map.
CensusViewProvider::ViewDataMap vdm1;
vdm1.emplace(
::grpc::load_reporter::kViewStartCount,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewStartCount),
{{{kClientIp1 + kLbToken1, kHostname1, kUser1}, 1234},
{{kClientIp2 + kLbToken1, kHostname1, kUser1}, 1225},
{{kClientIp0 + kLbToken1, kHostname1, kUser1}, 10},
{{kClientIp2 + kLbToken1, kHostname1, kUser2}, 464},
{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3}, 101},
{{kClientIp1 + kLbToken2, kHostname2, kUser3}, 17},
{{kClientIp2 + kLbId3 + kLbTag2, kHostname2, kUser3}, 23}}));
vdm1.emplace(::grpc::load_reporter::kViewEndCount,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndCount),
{{{kClientIp1 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
641},
{{kClientIp2 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusClientError},
272},
{{kClientIp2 + kLbToken1, kHostname1, kUser2,
::grpc::load_reporter::kCallStatusOk},
996},
{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3,
::grpc::load_reporter::kCallStatusClientError},
34},
{{kClientIp1 + kLbToken2, kHostname2, kUser2,
::grpc::load_reporter::kCallStatusOk},
18}}));
vdm1.emplace(::grpc::load_reporter::kViewEndBytesSent,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndBytesSent),
{{{kClientIp1 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
8977},
{{kClientIp2 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusClientError},
266},
{{kClientIp2 + kLbToken1, kHostname1, kUser2,
::grpc::load_reporter::kCallStatusOk},
1276},
{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3,
::grpc::load_reporter::kCallStatusClientError},
77823},
{{kClientIp1 + kLbToken2, kHostname2, kUser2,
::grpc::load_reporter::kCallStatusOk},
48}}));
vdm1.emplace(::grpc::load_reporter::kViewEndBytesReceived,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndBytesReceived),
{{{kClientIp1 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
2341},
{{kClientIp2 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusClientError},
466},
{{kClientIp2 + kLbToken1, kHostname1, kUser2,
::grpc::load_reporter::kCallStatusOk},
518},
{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3,
::grpc::load_reporter::kCallStatusClientError},
81},
{{kClientIp1 + kLbToken2, kHostname2, kUser2,
::grpc::load_reporter::kCallStatusOk},
27}}));
vdm1.emplace(::grpc::load_reporter::kViewEndLatencyMs,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndLatencyMs),
{{{kClientIp1 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
3.14},
{{kClientIp2 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusClientError},
5.26},
{{kClientIp2 + kLbToken1, kHostname1, kUser2,
::grpc::load_reporter::kCallStatusOk},
45.4},
{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3,
::grpc::load_reporter::kCallStatusClientError},
4.4},
{{kClientIp1 + kLbToken2, kHostname2, kUser2,
::grpc::load_reporter::kCallStatusOk},
2348.0}}));
vdm1.emplace(
::grpc::load_reporter::kViewOtherCallMetricCount,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewOtherCallMetricCount),
{{{kClientIp1 + kLbToken1, kHostname1, kUser2, kMetric1}, 1},
{{kClientIp1 + kLbToken1, kHostname1, kUser2, kMetric1}, 1},
{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3, kMetric2},
1}}));
vdm1.emplace(
::grpc::load_reporter::kViewOtherCallMetricValue,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewOtherCallMetricValue),
{{{kClientIp1 + kLbToken1, kHostname1, kUser2, kMetric1}, 1.2},
{{kClientIp1 + kLbToken1, kHostname1, kUser2, kMetric1}, 1.2},
{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3, kMetric2},
3.2}}));
// Make up the second view data map.
CensusViewProvider::ViewDataMap vdm2;
vdm2.emplace(
::grpc::load_reporter::kViewStartCount,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewStartCount),
{{{kClientIp2 + kLbToken1, kHostname1, kUser1}, 3},
{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3}, 778}}));
vdm2.emplace(::grpc::load_reporter::kViewEndCount,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndCount),
{{{kClientIp1 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
24},
{{kClientIp1 + kLbToken2, kHostname2, kUser3,
::grpc::load_reporter::kCallStatusClientError},
546}}));
vdm2.emplace(::grpc::load_reporter::kViewEndBytesSent,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndBytesSent),
{{{kClientIp1 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
747},
{{kClientIp1 + kLbToken2, kHostname2, kUser3,
::grpc::load_reporter::kCallStatusClientError},
229}}));
vdm2.emplace(::grpc::load_reporter::kViewEndBytesReceived,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndBytesReceived),
{{{kClientIp1 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
173},
{{kClientIp1 + kLbToken2, kHostname2, kUser3,
::grpc::load_reporter::kCallStatusClientError},
438}}));
vdm2.emplace(::grpc::load_reporter::kViewEndLatencyMs,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewEndLatencyMs),
{{{kClientIp1 + kLbToken1, kHostname1, kUser1,
::grpc::load_reporter::kCallStatusOk},
187},
{{kClientIp1 + kLbToken2, kHostname2, kUser3,
::grpc::load_reporter::kCallStatusClientError},
34}}));
vdm2.emplace(
::grpc::load_reporter::kViewOtherCallMetricCount,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewOtherCallMetricCount),
{{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3, kMetric1}, 1},
{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3, kMetric2},
1}}));
vdm2.emplace(
::grpc::load_reporter::kViewOtherCallMetricValue,
::opencensus::stats::testing::TestUtils::MakeViewData(
mock_census_view_provider()->FindViewDescriptor(
::grpc::load_reporter::kViewOtherCallMetricValue),
{{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3, kMetric1}, 9.6},
{{kClientIp1 + kLbId2 + kLbTag1, kHostname2, kUser3, kMetric2},
5.7}}));
// Set up mock expectation.
EXPECT_CALL(*mock_census_view_provider(), FetchViewData())
.WillOnce(Return(vdm1))
.WillOnce(Return(vdm2));
PrepareCpuExpectation(2);
// Start testing.
load_reporter_->ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
load_reporter_->ReportStreamCreated(kHostname2, kLbId2, kLoadKey2);
load_reporter_->ReportStreamCreated(kHostname2, kLbId3, kLoadKey3);
// First fetch.
load_reporter_->FetchAndSample();
load_reporter_->GenerateLoads(kHostname1, kLbId1);
gpr_log(GPR_INFO, "First load generated.");
// Second fetch.
load_reporter_->FetchAndSample();
load_reporter_->GenerateLoads(kHostname2, kLbId2);
gpr_log(GPR_INFO, "Second load generated.");
// TODO(juanlishen): Verify the data.
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Loading…
Cancel
Save