Merge pull request #15853 from AspirinSJL/load_reporting_service

Add server load reporting service
pull/15851/merge
Juanli Shen 6 years ago committed by GitHub
commit 4f840eafec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 55
      BUILD
  2. 53
      include/grpcpp/ext/server_load_reporting.h
  3. 2
      include/grpcpp/impl/codegen/server_context.h
  4. 65
      src/core/ext/filters/load_reporting/registered_opencensus_objects.h
  5. 54
      src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
  6. 9
      src/core/ext/filters/load_reporting/server_load_reporting_filter.h
  7. 10
      src/cpp/server/load_reporter/constants.h
  8. 3
      src/cpp/server/load_reporter/load_data_store.h
  9. 79
      src/cpp/server/load_reporter/load_reporter.cc
  10. 5
      src/cpp/server/load_reporter/load_reporter.h
  11. 370
      src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
  12. 194
      src/cpp/server/load_reporter/load_reporter_async_service_impl.h
  13. 41
      src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc
  14. 60
      src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc
  15. 62
      src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h
  16. 45
      src/cpp/server/load_reporter/util.cc
  17. 4
      src/proto/grpc/lb/v1/load_reporter.proto
  18. 16
      test/cpp/end2end/BUILD
  19. 191
      test/cpp/end2end/server_load_reporting_end2end_test.cc
  20. 1
      test/cpp/server/load_reporter/BUILD
  21. 9
      test/cpp/server/load_reporter/load_reporter_test.cc

55
BUILD

@ -98,10 +98,10 @@ GRPC_PUBLIC_HDRS = [
"include/grpc/grpc.h",
"include/grpc/grpc_posix.h",
"include/grpc/grpc_security_constants.h",
"include/grpc/load_reporting.h",
"include/grpc/slice.h",
"include/grpc/slice_buffer.h",
"include/grpc/status.h",
"include/grpc/load_reporting.h",
"include/grpc/support/workaround_list.h",
]
@ -1201,9 +1201,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
@ -1211,9 +1211,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
],
external_deps = [
"nanopb",
@ -1234,9 +1234,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
@ -1244,9 +1244,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
],
external_deps = [
"nanopb",
@ -1333,6 +1333,51 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "lb_server_load_reporting_service_server_builder_plugin",
srcs = [
"src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc",
],
hdrs = [
"src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h",
],
language = "c++",
deps = [
"lb_load_reporter_service",
],
)
grpc_cc_library(
name = "grpcpp_server_load_reporting",
srcs = [
"src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc",
"src/cpp/server/load_reporter/util.cc",
],
language = "c++",
public_hdrs = [
"include/grpcpp/ext/server_load_reporting.h",
],
deps = [
"lb_server_load_reporting_filter",
"lb_server_load_reporting_service_server_builder_plugin",
],
alwayslink = 1,
)
grpc_cc_library(
name = "lb_load_reporter_service",
srcs = [
"src/cpp/server/load_reporter/load_reporter_async_service_impl.cc",
],
hdrs = [
"src/cpp/server/load_reporter/load_reporter_async_service_impl.h",
],
language = "c++",
deps = [
"lb_load_reporter",
],
)
grpc_cc_library(
name = "lb_get_cpu_stats",
srcs = [

@ -0,0 +1,53 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPCPP_EXT_SERVER_LOAD_REPORTING_H
#define GRPCPP_EXT_SERVER_LOAD_REPORTING_H
#include <grpc/support/port_platform.h>
#include <grpc/load_reporting.h>
#include <grpcpp/impl/codegen/config.h>
#include <grpcpp/impl/codegen/server_context.h>
#include <grpcpp/impl/server_builder_option.h>
namespace grpc {
namespace load_reporter {
namespace experimental {
// The ServerBuilderOption to enable server-side load reporting feature. To
// enable the feature, please make sure the binary builds with the
// grpcpp_server_load_reporting library and set this option in the
// ServerBuilder.
class LoadReportingServiceServerBuilderOption : public ServerBuilderOption {
public:
void UpdateArguments(::grpc::ChannelArguments* args) override;
void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>*
plugins) override;
};
// Adds the load reporting cost with \a cost_name and \a cost_value in the
// trailing metadata of the server context.
void AddLoadReportingCost(grpc::ServerContext* ctx,
const grpc::string& cost_name, double cost_value);
} // namespace experimental
} // namespace load_reporter
} // namespace grpc
#endif // GRPCPP_EXT_SERVER_LOAD_REPORTING_H

@ -201,7 +201,7 @@ class ServerContext {
/// \param algorithm The compression algorithm used for the server call.
void set_compression_algorithm(grpc_compression_algorithm algorithm);
/// Set the load reporting costs in \a cost_data for the call.
/// Set the serialized load reporting costs in \a cost_data for the call.
void SetLoadReportingCosts(const std::vector<grpc::string>& cost_data);
/// Return the authentication context for this server call.

@ -28,75 +28,84 @@
namespace grpc {
namespace load_reporter {
// Note that the functions here are specified as inline to share the static
// objects across all the translation units including this header. See more
// details on https://en.cppreference.com/w/cpp/language/inline.
// Measures.
::opencensus::stats::MeasureInt64 MeasureStartCount() {
static const ::opencensus::stats::MeasureInt64 start_count =
inline ::opencensus::stats::MeasureInt64 MeasureStartCount() {
static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureStartCount, kMeasureStartCount, kMeasureStartCount);
return start_count;
return measure;
}
::opencensus::stats::MeasureInt64 MeasureEndCount() {
static const ::opencensus::stats::MeasureInt64 end_count =
inline ::opencensus::stats::MeasureInt64 MeasureEndCount() {
static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndCount, kMeasureEndCount, kMeasureEndCount);
return end_count;
return measure;
}
::opencensus::stats::MeasureInt64 MeasureEndBytesSent() {
static const ::opencensus::stats::MeasureInt64 end_bytes_sent =
inline ::opencensus::stats::MeasureInt64 MeasureEndBytesSent() {
static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndBytesSent, kMeasureEndBytesSent, kMeasureEndBytesSent);
return end_bytes_sent;
return measure;
}
::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() {
static const ::opencensus::stats::MeasureInt64 end_bytes_received =
inline ::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() {
static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(kMeasureEndBytesReceived,
kMeasureEndBytesReceived,
kMeasureEndBytesReceived);
return end_bytes_received;
return measure;
}
::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() {
static const ::opencensus::stats::MeasureInt64 end_latency_ms =
inline ::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() {
static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndLatencyMs, kMeasureEndLatencyMs, kMeasureEndLatencyMs);
return end_latency_ms;
return measure;
}
::opencensus::stats::MeasureDouble MeasureOtherCallMetric() {
static const ::opencensus::stats::MeasureDouble other_call_metric =
inline ::opencensus::stats::MeasureDouble MeasureOtherCallMetric() {
static const ::opencensus::stats::MeasureDouble measure =
::opencensus::stats::MeasureDouble::Register(kMeasureOtherCallMetric,
kMeasureOtherCallMetric,
kMeasureOtherCallMetric);
return other_call_metric;
return measure;
}
// Tags.
opencensus::stats::TagKey TagKeyToken() {
static const auto token = opencensus::stats::TagKey::Register(kTagKeyToken);
inline ::opencensus::stats::TagKey TagKeyToken() {
static const ::opencensus::stats::TagKey token =
opencensus::stats::TagKey::Register(kTagKeyToken);
return token;
}
opencensus::stats::TagKey TagKeyHost() {
static const auto token = opencensus::stats::TagKey::Register(kTagKeyHost);
inline ::opencensus::stats::TagKey TagKeyHost() {
static const ::opencensus::stats::TagKey token =
opencensus::stats::TagKey::Register(kTagKeyHost);
return token;
}
opencensus::stats::TagKey TagKeyUserId() {
static const auto token = opencensus::stats::TagKey::Register(kTagKeyUserId);
inline ::opencensus::stats::TagKey TagKeyUserId() {
static const ::opencensus::stats::TagKey token =
opencensus::stats::TagKey::Register(kTagKeyUserId);
return token;
}
opencensus::stats::TagKey TagKeyStatus() {
static const auto token = opencensus::stats::TagKey::Register(kTagKeyStatus);
inline ::opencensus::stats::TagKey TagKeyStatus() {
static const ::opencensus::stats::TagKey token =
opencensus::stats::TagKey::Register(kTagKeyStatus);
return token;
}
opencensus::stats::TagKey TagKeyMetricName() {
static const auto token =
inline ::opencensus::stats::TagKey TagKeyMetricName() {
static const ::opencensus::stats::TagKey token =
opencensus::stats::TagKey::Register(kTagKeyMetricName);
return token;
}

@ -19,7 +19,6 @@
#include <grpc/support/port_platform.h>
#include <grpc/grpc_security.h>
#include <grpc/load_reporting.h>
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -31,18 +30,20 @@
#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr_posix.h"
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/transport/static_metadata.h"
namespace grpc {
constexpr char kEncodedIpv4AddressLengthString[] = "08";
constexpr char kEncodedIpv6AddressLengthString[] = "32";
constexpr char kEmptyAddressLengthString[] = "00";
constexpr size_t kLengthPrefixSize = 2;
grpc_error* ServerLoadReportingChannelData::Init(
grpc_channel_element* /* elem */, grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
@ -90,7 +91,9 @@ void ServerLoadReportingCallData::Destroy(
{chand->peer_identity(), chand->peer_identity_len()}},
{::grpc::load_reporter::TagKeyStatus(),
GetStatusTagForStatus(final_info->final_status)}});
gpr_free(client_ip_and_lr_token_);
}
gpr_free(target_host_);
grpc_slice_unref_internal(service_method_);
}
@ -100,7 +103,8 @@ void ServerLoadReportingCallData::StartTransportStreamOpBatch(
if (op->recv_initial_metadata() != nullptr) {
// Save some fields to use when initial metadata is ready.
peer_string_ = op->get_peer_string();
recv_initial_metadata_ = op->recv_initial_metadata();
recv_initial_metadata_ =
op->op()->payload->recv_initial_metadata.recv_initial_metadata;
original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
// Substitute the original closure for the wrapper closure.
op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
@ -157,10 +161,10 @@ void ServerLoadReportingCallData::GetCensusSafeClientIpString(
*size = 8;
} else if (addr->sa_family == GRPC_AF_INET6) {
grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
*client_ip_string = static_cast<char*>(gpr_malloc(32));
*client_ip_string = static_cast<char*>(gpr_malloc(32 + 1));
for (size_t i = 0; i < 16; ++i) {
sprintf(*client_ip_string + i, "%02x",
addr6->sin6_addr.__in6_u.__u6_addr8[i]);
snprintf(*client_ip_string + i * 2, 2 + 1, "%02x",
addr6->sin6_addr.__in6_u.__u6_addr8[i]);
}
*size = 32;
} else {
@ -241,7 +245,7 @@ void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg,
if (err == GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR(
"server_load_reporting_filter",
grpc_metadata_batch_filter(calld->recv_initial_metadata_->batch(),
grpc_metadata_batch_filter(calld->recv_initial_metadata_,
RecvInitialMetadataFilter, elem,
"recv_initial_metadata filtering error"));
// If the LB token was not found in the recv_initial_metadata, only the
@ -277,7 +281,6 @@ grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter(
reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
ServerLoadReportingChannelData* chand =
reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
// TODO(juanlishen): GRPC_MDSTR_LB_COST_BIN meaning?
if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
const grpc_slice value = GRPC_MDVALUE(md);
const size_t cost_entry_size = GRPC_SLICE_LENGTH(value);
@ -325,4 +328,35 @@ const char* ServerLoadReportingCallData::GetStatusTagForStatus(
}
}
namespace {
bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
return grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
}
} // namespace
// TODO(juanlishen): We should register the filter during grpc initialization
// time once OpenCensus is compatible with our build system. For now, we force
// registration of the server load reporting filter at static initialization
// time if we build with the filter target.
struct ServerLoadReportingFilterStaticRegistrar {
ServerLoadReportingFilterStaticRegistrar() {
static std::atomic_bool registered{false};
if (registered) return;
RegisterChannelFilter<ServerLoadReportingChannelData,
ServerLoadReportingCallData>(
"server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX,
MaybeAddServerLoadReportingFilter);
// Access measures to ensure they are initialized. Otherwise, we can't
// create any valid view before the first RPC.
::grpc::load_reporter::MeasureStartCount();
::grpc::load_reporter::MeasureEndCount();
::grpc::load_reporter::MeasureEndBytesSent();
::grpc::load_reporter::MeasureEndBytesReceived();
::grpc::load_reporter::MeasureEndLatencyMs();
::grpc::load_reporter::MeasureOtherCallMetric();
registered = true;
}
} server_load_reporting_filter_static_registrar;
} // namespace grpc

@ -86,8 +86,8 @@ class ServerLoadReportingCallData : public CallData {
// The received initial metadata (a member of the recv_initial_metadata op).
// When it is ready, we will extract some data from it via
// recv_initial_metadata_ready_ closure, before the original
// recv_initial_metadata_ready closure,
MetadataBatch* recv_initial_metadata_;
// recv_initial_metadata_ready closure.
grpc_metadata_batch* recv_initial_metadata_;
// The original recv_initial_metadata closure, which is wrapped by our own
// closure (recv_initial_metadata_ready_) to capture the incoming initial
@ -112,11 +112,6 @@ class ServerLoadReportingCallData : public CallData {
// token.
char* client_ip_and_lr_token_;
size_t client_ip_and_lr_token_len_;
static constexpr char kEncodedIpv4AddressLengthString[] = "08";
static constexpr char kEncodedIpv6AddressLengthString[] = "32";
static constexpr char kEmptyAddressLengthString[] = "00";
static constexpr size_t kLengthPrefixSize = 2;
};
} // namespace grpc

@ -24,6 +24,16 @@
namespace grpc {
namespace load_reporter {
// TODO(juanlishen): Update the version number with the PR number every time
// there is any change to the server load reporter.
constexpr uint32_t kVersion = 15853;
// TODO(juanlishen): This window size is from the internal spec for the load
// reporter. Need to ask the gRPC LB team whether we should make this and the
// fetching interval configurable.
constexpr uint32_t kFeedbackSampleWindowSeconds = 10;
constexpr uint32_t kFetchAndSampleIntervalSeconds = 1;
constexpr size_t kLbIdLength = 8;
constexpr size_t kIpv4AddressLength = 8;
constexpr size_t kIpv6AddressLength = 32;

@ -160,7 +160,8 @@ class LoadRecordValue {
", error_count_=" + grpc::to_string(error_count_) +
", bytes_sent_=" + grpc::to_string(bytes_sent_) +
", bytes_recv_=" + grpc::to_string(bytes_recv_) +
", latency_ms_=" + grpc::to_string(latency_ms_) + "]";
", latency_ms_=" + grpc::to_string(latency_ms_) + ", " +
grpc::to_string(call_metrics_.size()) + " other call metric(s)]";
}
bool InsertCallMetric(const grpc::string& metric_name,

@ -22,6 +22,7 @@
#include <stdio.h>
#include <chrono>
#include <ctime>
#include <iterator>
#include "src/cpp/server/load_reporter/constants.h"
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
@ -65,8 +66,8 @@ CensusViewProvider::CensusViewProvider()
// measurements instead of setting the data values directly.
auto vd_end_count =
::opencensus::stats::ViewDescriptor()
.set_name((kViewEndCount))
.set_measure((kMeasureEndCount))
.set_name(kViewEndCount)
.set_measure(kMeasureEndCount)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@ -80,8 +81,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
auto vd_end_bytes_sent =
::opencensus::stats::ViewDescriptor()
.set_name((kViewEndBytesSent))
.set_measure((kMeasureEndBytesSent))
.set_name(kViewEndBytesSent)
.set_measure(kMeasureEndBytesSent)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@ -95,8 +96,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
auto vd_end_bytes_received =
::opencensus::stats::ViewDescriptor()
.set_name((kViewEndBytesReceived))
.set_measure((kMeasureEndBytesReceived))
.set_name(kViewEndBytesReceived)
.set_measure(kMeasureEndBytesReceived)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@ -110,8 +111,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
auto vd_end_latency_ms =
::opencensus::stats::ViewDescriptor()
.set_name((kViewEndLatencyMs))
.set_measure((kMeasureEndLatencyMs))
.set_name(kViewEndLatencyMs)
.set_measure(kMeasureEndLatencyMs)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@ -126,8 +127,8 @@ CensusViewProvider::CensusViewProvider()
// Two views related to other call metrics.
auto vd_metric_call_count =
::opencensus::stats::ViewDescriptor()
.set_name((kViewOtherCallMetricCount))
.set_measure((kMeasureOtherCallMetric))
.set_name(kViewOtherCallMetricCount)
.set_measure(kMeasureOtherCallMetric)
.set_aggregation(::opencensus::stats::Aggregation::Count())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@ -141,8 +142,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
auto vd_metric_value =
::opencensus::stats::ViewDescriptor()
.set_name((kViewOtherCallMetricValue))
.set_measure((kMeasureOtherCallMetric))
.set_name(kViewOtherCallMetricValue)
.set_measure(kMeasureOtherCallMetric)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@ -161,11 +162,26 @@ double CensusViewProvider::GetRelatedViewDataRowDouble(
size_t view_name_len, const std::vector<grpc::string>& tag_values) {
auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
GPR_ASSERT(it_vd != view_data_map.end());
GPR_ASSERT(it_vd->second.type() ==
::opencensus::stats::ViewData::Type::kDouble);
auto it_row = it_vd->second.double_data().find(tag_values);
GPR_ASSERT(it_row != it_vd->second.double_data().end());
return it_row->second;
}
uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
const ViewDataMap& view_data_map, const char* view_name,
size_t view_name_len, const std::vector<grpc::string>& tag_values) {
auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
GPR_ASSERT(it_vd != view_data_map.end());
GPR_ASSERT(it_vd->second.type() ==
::opencensus::stats::ViewData::Type::kInt64);
auto it_row = it_vd->second.int_data().find(tag_values);
GPR_ASSERT(it_row != it_vd->second.int_data().end());
GPR_ASSERT(it_row->second >= 0);
return it_row->second;
}
CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
for (const auto& p : view_descriptor_map()) {
const grpc::string& view_name = p.first;
@ -235,23 +251,23 @@ LoadReporter::GenerateLoadBalancingFeedback() {
return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
}
// Find the longest range with valid ends.
LoadBalancingFeedbackRecord* oldest = &feedback_records_[0];
LoadBalancingFeedbackRecord* newest =
&feedback_records_[feedback_records_.size() - 1];
while (newest > oldest &&
auto oldest = feedback_records_.begin();
auto newest = feedback_records_.end() - 1;
while (std::distance(oldest, newest) > 0 &&
(newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
// A zero limit means that the system info reading was failed, so these
// records can't be used to calculate CPU utilization.
if (newest->cpu_limit == 0) --newest;
if (oldest->cpu_limit == 0) ++oldest;
}
if (newest - oldest < 1 || oldest->end_time == newest->end_time ||
if (std::distance(oldest, newest) < 1 ||
oldest->end_time == newest->end_time ||
newest->cpu_limit == oldest->cpu_limit) {
return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
}
uint64_t rpcs = 0;
uint64_t errors = 0;
for (LoadBalancingFeedbackRecord* p = newest; p != oldest; --p) {
for (auto p = newest; p != oldest; --p) {
// Because these two numbers are counters, the oldest record shouldn't be
// included.
rpcs += p->rpcs;
@ -338,7 +354,8 @@ void LoadReporter::AttachOrphanLoadId(
if (per_balancer_store.lb_id() == kInvalidLbId) {
load->set_load_key_unknown(true);
} else {
load->set_load_key_unknown(false);
// We shouldn't set load_key_unknown to any value in this case because
// load_key_unknown and orphaned_load_identifier are under an oneof struct.
load->mutable_orphaned_load_identifier()->set_load_key(
per_balancer_store.load_key());
load->mutable_orphaned_load_identifier()->set_load_balancer_id(
@ -381,9 +398,7 @@ void LoadReporter::ProcessViewDataCallStart(
const CensusViewProvider::ViewDataMap& view_data_map) {
auto it = view_data_map.find(kViewStartCount);
if (it != view_data_map.end()) {
// Note that the data type for any Sum view is double, whatever the data
// type of the original measure.
for (const auto& p : it->second.double_data()) {
for (const auto& p : it->second.int_data()) {
const std::vector<grpc::string>& tag_values = p.first;
const uint64_t start_count = static_cast<uint64_t>(p.second);
const grpc::string& client_ip_and_token = tag_values[0];
@ -405,9 +420,7 @@ void LoadReporter::ProcessViewDataCallEnd(
uint64_t total_error_count = 0;
auto it = view_data_map.find(kViewEndCount);
if (it != view_data_map.end()) {
// Note that the data type for any Sum view is double, whatever the data
// type of the original measure.
for (const auto& p : it->second.double_data()) {
for (const auto& p : it->second.int_data()) {
const std::vector<grpc::string>& tag_values = p.first;
const uint64_t end_count = static_cast<uint64_t>(p.second);
const grpc::string& client_ip_and_token = tag_values[0];
@ -424,18 +437,16 @@ void LoadReporter::ProcessViewDataCallEnd(
continue;
}
LoadRecordKey key(client_ip_and_token, user_id);
const uint64_t bytes_sent =
CensusViewProvider::GetRelatedViewDataRowDouble(
view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
tag_values);
const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
tag_values);
const uint64_t bytes_received =
CensusViewProvider::GetRelatedViewDataRowDouble(
CensusViewProvider::GetRelatedViewDataRowInt(
view_data_map, kViewEndBytesReceived,
sizeof(kViewEndBytesReceived) - 1, tag_values);
const uint64_t latency_ms =
CensusViewProvider::GetRelatedViewDataRowDouble(
view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
tag_values);
const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
tag_values);
uint64_t ok_count = 0;
uint64_t error_count = 0;
total_end_count += end_count;

@ -59,7 +59,10 @@ class CensusViewProvider {
// with the same tag values in a related view data. Several ViewData's are
// considered related if their views are based on the measures that are always
// recorded at the same time.
double static GetRelatedViewDataRowDouble(
static double GetRelatedViewDataRowDouble(
const ViewDataMap& view_data_map, const char* view_name,
size_t view_name_len, const std::vector<grpc::string>& tag_values);
static uint64_t GetRelatedViewDataRowInt(
const ViewDataMap& view_data_map, const char* view_name,
size_t view_name_len, const std::vector<grpc::string>& tag_values);

@ -0,0 +1,370 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
namespace grpc {
namespace load_reporter {
void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) {
GPR_ASSERT(handler_function_ != nullptr);
GPR_ASSERT(handler_ != nullptr);
handler_function_(std::move(handler_), ok);
}
LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl(
std::unique_ptr<ServerCompletionQueue> cq)
: cq_(std::move(cq)) {
thread_ = std::unique_ptr<::grpc_core::Thread>(
new ::grpc_core::Thread("server_load_reporting", Work, this));
std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
#if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
cpu_stats_provider.reset(new CpuStatsProviderDefaultImpl());
#endif
load_reporter_ = std::unique_ptr<LoadReporter>(new LoadReporter(
kFeedbackSampleWindowSeconds,
std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
std::move(cpu_stats_provider)));
}
LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
// We will reach here after the server starts shutting down.
shutdown_ = true;
{
std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
cq_->Shutdown();
}
if (next_fetch_and_sample_alarm_ != nullptr)
next_fetch_and_sample_alarm_->Cancel();
thread_->Join();
}
void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
auto next_fetch_and_sample_time =
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
GPR_TIMESPAN));
{
std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
if (shutdown_) return;
// TODO(juanlishen): Improve the Alarm implementation to reuse a single
// instance for multiple events.
next_fetch_and_sample_alarm_.reset(new Alarm);
next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
this);
}
gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this);
}
void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) {
if (!ok) {
gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this);
return;
}
gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this);
load_reporter_->FetchAndSample();
ScheduleNextFetchAndSample();
}
void LoadReporterAsyncServiceImpl::Work(void* arg) {
LoadReporterAsyncServiceImpl* service =
reinterpret_cast<LoadReporterAsyncServiceImpl*>(arg);
service->FetchAndSample(true /* ok */);
// TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
// to figure out why cq is not ready after service starts.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(1, GPR_TIMESPAN)));
ReportLoadHandler::CreateAndStart(service->cq_.get(), service,
service->load_reporter_.get());
void* tag;
bool ok;
while (true) {
if (!service->cq_->Next(&tag, &ok)) {
// The completion queue is shutting down.
GPR_ASSERT(service->shutdown_);
break;
}
if (tag == service) {
service->FetchAndSample(ok);
} else {
auto* next_step = static_cast<CallableTag*>(tag);
next_step->Run(ok);
}
}
}
void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); }
void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
LoadReporter* load_reporter) {
std::shared_ptr<ReportLoadHandler> handler =
std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
ReportLoadHandler* p = handler.get();
{
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
if (service->shutdown_) return;
p->on_done_notified_ =
CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
std::placeholders::_1, std::placeholders::_2),
handler);
p->next_inbound_ =
CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p,
std::placeholders::_1, std::placeholders::_2),
std::move(handler));
p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
&p->next_inbound_);
}
}
LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler(
ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
LoadReporter* load_reporter)
: cq_(cq),
service_(service),
load_reporter_(load_reporter),
stream_(&ctx_),
call_status_(WAITING_FOR_DELIVERY) {}
void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
std::shared_ptr<ReportLoadHandler> self, bool ok) {
if (ok) {
call_status_ = DELIVERED;
} else {
// AsyncNotifyWhenDone() needs to be called before the call starts, but the
// tag will not pop out if the call never starts (
// https://github.com/grpc/grpc/issues/10136). So we need to manually
// release the ownership of the handler in this case.
GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
}
if (!ok || shutdown_) {
// The value of ok being false means that the server is shutting down.
Shutdown(std::move(self), "OnRequestDelivered");
return;
}
// Spawn a new handler instance to serve the next new client. Every handler
// instance will deallocate itself when it's done.
CreateAndStart(cq_, service_, load_reporter_);
{
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.release()->unlock();
Shutdown(std::move(self), "OnRequestDelivered");
return;
}
next_inbound_ =
CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Read(&request_, &next_inbound_);
}
// LB ID is unique for each load reporting stream.
lb_id_ = load_reporter_->GenerateLbId();
gpr_log(GPR_INFO,
"[LRS %p] Call request delivered (lb_id_: %s, handler: %p). "
"Start reading the initial request...",
service_, lb_id_.c_str(), this);
}
void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
std::shared_ptr<ReportLoadHandler> self, bool ok) {
if (!ok || shutdown_) {
if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
// The client may have half-closed the stream or the stream is broken.
gpr_log(GPR_INFO,
"[LRS %p] Failed reading the initial request from the stream "
"(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).",
service_, lb_id_.c_str(), this, static_cast<int>(done_notified_),
static_cast<int>(is_cancelled_));
}
Shutdown(std::move(self), "OnReadDone");
return;
}
// We only receive one request, which is the initial request.
if (call_status_ < INITIAL_REQUEST_RECEIVED) {
if (!request_.has_initial_request()) {
Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
} else {
call_status_ = INITIAL_REQUEST_RECEIVED;
const auto& initial_request = request_.initial_request();
load_balanced_hostname_ = initial_request.load_balanced_hostname();
load_key_ = initial_request.load_key();
load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
load_key_);
const auto& load_report_interval = initial_request.load_report_interval();
load_report_interval_ms_ =
static_cast<uint64_t>(load_report_interval.seconds() * 1000 +
load_report_interval.nanos() / 1000);
gpr_log(
GPR_INFO,
"[LRS %p] Initial request received. Start load reporting (load "
"balanced host: %s, interval: %lu ms, lb_id_: %s, handler: %p)...",
service_, load_balanced_hostname_.c_str(), load_report_interval_ms_,
lb_id_.c_str(), this);
SendReport(self, true /* ok */);
// Expect this read to fail.
{
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.release()->unlock();
Shutdown(std::move(self), "OnReadDone");
return;
}
next_inbound_ =
CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Read(&request_, &next_inbound_);
}
}
} else {
// Another request received! This violates the spec.
gpr_log(GPR_ERROR,
"[LRS %p] Another request received (lb_id_: %s, handler: %p).",
service_, lb_id_.c_str(), this);
Shutdown(std::move(self), "OnReadDone+second_request");
}
}
void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
std::shared_ptr<ReportLoadHandler> self, bool ok) {
if (!ok || shutdown_) {
Shutdown(std::move(self), "ScheduleNextReport");
return;
}
auto next_report_time = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
{
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.release()->unlock();
Shutdown(std::move(self), "ScheduleNextReport");
return;
}
next_outbound_ =
CallableTag(std::bind(&ReportLoadHandler::SendReport, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
// TODO(juanlishen): Improve the Alarm implementation to reuse a single
// instance for multiple events.
next_report_alarm_.reset(new Alarm);
next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
}
gpr_log(GPR_DEBUG,
"[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).",
service_, lb_id_.c_str(), this);
}
void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
std::shared_ptr<ReportLoadHandler> self, bool ok) {
if (!ok || shutdown_) {
Shutdown(std::move(self), "SendReport");
return;
}
::grpc::lb::v1::LoadReportResponse response;
auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
response.mutable_load()->Swap(&loads);
auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
response.mutable_load_balancing_feedback()->Swap(&feedback);
if (call_status_ < INITIAL_RESPONSE_SENT) {
auto initial_response = response.mutable_initial_response();
initial_response->set_load_balancer_id(lb_id_);
initial_response->set_implementation_id(
::grpc::lb::v1::InitialLoadReportResponse::CPP);
initial_response->set_server_version(kVersion);
call_status_ = INITIAL_RESPONSE_SENT;
}
{
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.release()->unlock();
Shutdown(std::move(self), "SendReport");
return;
}
next_outbound_ =
CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Write(response, &next_outbound_);
gpr_log(GPR_INFO,
"[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads "
"count: %d)...",
service_, lb_id_.c_str(), this, response.load().size());
}
}
void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified(
std::shared_ptr<ReportLoadHandler> self, bool ok) {
GPR_ASSERT(ok);
done_notified_ = true;
if (ctx_.IsCancelled()) {
is_cancelled_ = true;
}
gpr_log(GPR_INFO,
"[LRS %p] Load reporting call is notified done (handler: %p, "
"is_cancelled: %d).",
service_, this, static_cast<int>(is_cancelled_));
Shutdown(std::move(self), "OnDoneNotified");
}
void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
std::shared_ptr<ReportLoadHandler> self, const char* reason) {
if (!shutdown_) {
gpr_log(GPR_INFO,
"[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, "
"reason: %s).",
service_, lb_id_.c_str(), this, reason);
shutdown_ = true;
if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
next_report_alarm_->Cancel();
}
}
// OnRequestDelivered() may be called after OnDoneNotified(), so we need to
// try to Finish() every time we are in Shutdown().
if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
if (!service_->shutdown_) {
on_finish_done_ =
CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
// TODO(juanlishen): Maybe add a message proto for the client to
// explicitly cancel the stream so that we can return OK status in such
// cases.
stream_.Finish(Status::CANCELLED, &on_finish_done_);
call_status_ = FINISH_CALLED;
}
}
}
void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone(
std::shared_ptr<ReportLoadHandler> self, bool ok) {
if (ok) {
gpr_log(GPR_INFO,
"[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).",
service_, lb_id_.c_str(), this);
}
}
} // namespace load_reporter
} // namespace grpc

@ -0,0 +1,194 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
#include <grpc/support/port_platform.h>
#include <grpc/support/log.h>
#include <grpcpp/alarm.h>
#include <grpcpp/grpcpp.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/cpp/server/load_reporter/load_reporter.h"
namespace grpc {
namespace load_reporter {
// Async load reporting service. It's mainly responsible for controlling the
// procedure of incoming requests. The real business logic is handed off to the
// LoadReporter. There should be at most one instance of this service on a
// server to avoid spreading the load data into multiple places.
class LoadReporterAsyncServiceImpl
: public grpc::lb::v1::LoadReporter::AsyncService {
public:
explicit LoadReporterAsyncServiceImpl(
std::unique_ptr<ServerCompletionQueue> cq);
~LoadReporterAsyncServiceImpl();
// Starts the working thread.
void StartThread();
// Not copyable nor movable.
LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete;
LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) =
delete;
private:
class ReportLoadHandler;
// A tag that can be called with a bool argument. It's tailored for
// ReportLoadHandler's use. Before being used, it should be constructed with a
// method of ReportLoadHandler and a shared pointer to the handler. The
// shared pointer will be moved to the invoked function and the function can
// only be invoked once. That makes ref counting of the handler easier,
// because the shared pointer is not bound to the function and can be gone
// once the invoked function returns (if not used any more).
class CallableTag {
public:
using HandlerFunction =
std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>;
CallableTag() {}
CallableTag(HandlerFunction func,
std::shared_ptr<ReportLoadHandler> handler)
: handler_function_(std::move(func)), handler_(std::move(handler)) {
GPR_ASSERT(handler_function_ != nullptr);
GPR_ASSERT(handler_ != nullptr);
}
// Runs the tag. This should be called only once. The handler is no longer
// owned by this tag after this method is invoked.
void Run(bool ok);
// Releases and returns the shared pointer to the handler.
std::shared_ptr<ReportLoadHandler> ReleaseHandler() {
return std::move(handler_);
}
private:
HandlerFunction handler_function_ = nullptr;
std::shared_ptr<ReportLoadHandler> handler_;
};
// Each handler takes care of one load reporting stream. It contains
// per-stream data and it will access the members of the parent class (i.e.,
// LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
class ReportLoadHandler {
public:
// Instantiates a ReportLoadHandler and requests the next load reporting
// call. The handler object will manage its own lifetime, so no action is
// needed from the caller any more regarding that object.
static void CreateAndStart(ServerCompletionQueue* cq,
LoadReporterAsyncServiceImpl* service,
LoadReporter* load_reporter);
// This ctor is public because we want to use std::make_shared<> in
// CreateAndStart(). This ctor shouldn't be used elsewhere.
ReportLoadHandler(ServerCompletionQueue* cq,
LoadReporterAsyncServiceImpl* service,
LoadReporter* load_reporter);
private:
// After the handler has a call request delivered, it starts reading the
// initial request. Also, a new handler is spawned so that we can keep
// servicing future calls.
void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok);
// The first Read() is expected to succeed, after which the handler starts
// sending load reports back to the balancer. The second Read() is
// expected to fail, which happens when the balancer half-closes the
// stream to signal that it's no longer interested in the load reports. For
// the latter case, the handler will then close the stream.
void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
// The report sending operations are sequential as: send report -> send
// done, schedule the next send -> waiting for the alarm to fire -> alarm
// fires, send report -> ...
void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
// Called when Finish() is done.
void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
// Called when AsyncNotifyWhenDone() notifies us.
void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok);
void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason);
// The key fields of the stream.
grpc::string lb_id_;
grpc::string load_balanced_hostname_;
grpc::string load_key_;
uint64_t load_report_interval_ms_;
// The data for RPC communication with the load reportee.
ServerContext ctx_;
::grpc::lb::v1::LoadReportRequest request_;
// The members passed down from LoadReporterAsyncServiceImpl.
ServerCompletionQueue* cq_;
LoadReporterAsyncServiceImpl* service_;
LoadReporter* load_reporter_;
ServerAsyncReaderWriter<::grpc::lb::v1::LoadReportResponse,
::grpc::lb::v1::LoadReportRequest>
stream_;
// The status of the RPC progress.
enum CallStatus {
WAITING_FOR_DELIVERY,
DELIVERED,
INITIAL_REQUEST_RECEIVED,
INITIAL_RESPONSE_SENT,
FINISH_CALLED
} call_status_;
bool shutdown_{false};
bool done_notified_{false};
bool is_cancelled_{false};
CallableTag on_done_notified_;
CallableTag on_finish_done_;
CallableTag next_inbound_;
CallableTag next_outbound_;
std::unique_ptr<Alarm> next_report_alarm_;
};
// Handles the incoming requests and drives the completion queue in a loop.
static void Work(void* arg);
// Schedules the next data fetching from Census and LB feedback sampling.
void ScheduleNextFetchAndSample();
// Fetches data from Census and samples LB feedback.
void FetchAndSample(bool ok);
std::unique_ptr<ServerCompletionQueue> cq_;
// To synchronize the operations related to shutdown state of cq_, so that we
// don't enqueue new tags into cq_ after it is already shut down.
std::mutex cq_shutdown_mu_;
std::atomic_bool shutdown_{false};
std::unique_ptr<::grpc_core::Thread> thread_;
std::unique_ptr<LoadReporter> load_reporter_;
std::unique_ptr<Alarm> next_fetch_and_sample_alarm_;
};
} // namespace load_reporter
} // namespace grpc
#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H

@ -0,0 +1,41 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include <grpcpp/ext/server_load_reporting.h>
#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h"
namespace grpc {
namespace load_reporter {
namespace experimental {
void LoadReportingServiceServerBuilderOption::UpdateArguments(
::grpc::ChannelArguments* args) {
args->SetInt(GRPC_ARG_ENABLE_LOAD_REPORTING, true);
}
void LoadReportingServiceServerBuilderOption::UpdatePlugins(
std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>* plugins) {
plugins->emplace_back(new LoadReportingServiceServerBuilderPlugin());
}
} // namespace experimental
} // namespace load_reporter
} // namespace grpc

@ -0,0 +1,60 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h"
#include <grpcpp/impl/server_initializer.h>
namespace grpc {
namespace load_reporter {
bool LoadReportingServiceServerBuilderPlugin::has_sync_methods() const {
if (service_ != nullptr) {
return service_->has_synchronous_methods();
}
return false;
}
bool LoadReportingServiceServerBuilderPlugin::has_async_methods() const {
if (service_ != nullptr) {
return service_->has_async_methods();
}
return false;
}
void LoadReportingServiceServerBuilderPlugin::UpdateServerBuilder(
grpc::ServerBuilder* builder) {
auto cq = builder->AddCompletionQueue();
service_ = std::make_shared<LoadReporterAsyncServiceImpl>(std::move(cq));
}
void LoadReportingServiceServerBuilderPlugin::InitServer(
grpc::ServerInitializer* si) {
si->RegisterService(service_);
}
void LoadReportingServiceServerBuilderPlugin::Finish(
grpc::ServerInitializer* si) {
service_->StartThread();
service_.reset();
}
} // namespace load_reporter
} // namespace grpc

@ -0,0 +1,62 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
#define GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
#include <grpc/support/port_platform.h>
#include <grpcpp/impl/server_builder_plugin.h>
#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
namespace grpc {
namespace load_reporter {
// The plugin that registers and starts load reporting service when starting a
// server.
class LoadReportingServiceServerBuilderPlugin : public ServerBuilderPlugin {
public:
~LoadReportingServiceServerBuilderPlugin() override = default;
grpc::string name() override { return "load_reporting_service"; }
// Creates a load reporting service.
void UpdateServerBuilder(grpc::ServerBuilder* builder) override;
// Registers the load reporter service.
void InitServer(grpc::ServerInitializer* si) override;
// Starts the load reporter service.
void Finish(grpc::ServerInitializer* si) override;
void ChangeArguments(const grpc::string& name, void* value) override {}
void UpdateChannelArguments(grpc::ChannelArguments* args) override {}
bool has_sync_methods() const override;
bool has_async_methods() const override;
private:
std::shared_ptr<LoadReporterAsyncServiceImpl> service_;
};
std::unique_ptr<grpc::ServerBuilderPlugin>
CreateLoadReportingServiceServerBuilderPlugin();
} // namespace load_reporter
} // namespace grpc
#endif // GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H

@ -0,0 +1,45 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/impl/codegen/port_platform.h>
#include <grpcpp/ext/server_load_reporting.h>
#include <grpc/support/log.h>
namespace grpc {
namespace load_reporter {
namespace experimental {
void AddLoadReportingCost(grpc::ServerContext* ctx,
const grpc::string& cost_name, double cost_value) {
if (std::isnormal(cost_value)) {
grpc::string buf;
buf.resize(sizeof(cost_value) + cost_name.size());
memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value));
memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(),
cost_name.size());
ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf);
} else {
gpr_log(GPR_ERROR, "Call metric value is not normal.");
}
}
} // namespace experimental
} // namespace load_reporter
} // namespace grpc

@ -114,6 +114,10 @@ message Load {
// num_calls_in_progress is the only valid entry. If in_progress_report is not
// set, num_calls_in_progress will be ignored. If in_progress_report is set,
// fields other than num_calls_in_progress and orphaned_load will be ignored.
// TODO(juanlishen): A Load is either an in_progress_report or not. We should
// make this explicit in hierarchy. From the log, I see in_progress_report_
// has a random num_calls_in_progress_ when not set, which might lead to bug
// when the balancer process the load report.
oneof in_progress_report {
// The number of calls in progress (instantaneously) per load balancer id.
int64 num_calls_in_progress = 5;

@ -14,7 +14,7 @@
licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package", "grpc_cc_binary")
load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package")
grpc_package(
name = "test/cpp/end2end",
@ -429,6 +429,20 @@ grpc_cc_binary(
],
)
grpc_cc_test(
name = "server_load_reporting_end2end_test",
srcs = ["server_load_reporting_end2end_test.cc"],
external_deps = [
"gtest",
"gmock",
],
deps = [
"//:grpcpp_server_load_reporting",
"//src/proto/grpc/testing:echo_proto",
"//test/cpp/util:test_util",
],
)
grpc_cc_test(
name = "shutdown_test",
srcs = ["shutdown_test.cc"],

@ -0,0 +1,191 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include <thread>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <grpc++/grpc++.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpcpp/ext/server_load_reporting.h>
#include <grpcpp/server_builder.h>
#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
namespace grpc {
namespace testing {
namespace {
constexpr double kMetricValue = 3.1415;
constexpr char kMetricName[] = "METRIC_PI";
// Different messages result in different response statuses. For simplicity in
// computing request bytes, the message sizes should be the same.
const char kOkMessage[] = "hello";
const char kServerErrorMessage[] = "sverr";
const char kClientErrorMessage[] = "clerr";
class EchoTestServiceImpl : public EchoTestService::Service {
public:
~EchoTestServiceImpl() override {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
if (request->message() == kServerErrorMessage) {
return Status(StatusCode::UNKNOWN, "Server error requested");
}
if (request->message() == kClientErrorMessage) {
return Status(StatusCode::FAILED_PRECONDITION, "Client error requested");
}
response->set_message(request->message());
::grpc::load_reporter::experimental::AddLoadReportingCost(
context, kMetricName, kMetricValue);
return Status::OK;
}
};
class ServerLoadReportingEnd2endTest : public ::testing::Test {
protected:
void SetUp() override {
server_address_ =
"localhost:" + std::to_string(grpc_pick_unused_port_or_die());
server_ =
ServerBuilder()
.AddListeningPort(server_address_, InsecureServerCredentials())
.RegisterService(&echo_service_)
.SetOption(std::unique_ptr<::grpc::ServerBuilderOption>(
new ::grpc::load_reporter::experimental::
LoadReportingServiceServerBuilderOption()))
.BuildAndStart();
server_thread_ =
std::thread(&ServerLoadReportingEnd2endTest::RunServerLoop, this);
}
void RunServerLoop() { server_->Wait(); }
void TearDown() override {
server_->Shutdown();
server_thread_.join();
}
void ClientMakeEchoCalls(const grpc::string& lb_id,
const grpc::string& lb_tag,
const grpc::string& message, size_t num_requests) {
auto stub = EchoTestService::NewStub(
CreateChannel(server_address_, InsecureChannelCredentials()));
grpc::string lb_token = lb_id + lb_tag;
for (int i = 0; i < num_requests; ++i) {
ClientContext ctx;
if (!lb_token.empty()) ctx.AddMetadata(GRPC_LB_TOKEN_MD_KEY, lb_token);
EchoRequest request;
EchoResponse response;
request.set_message(message);
Status status = stub->Echo(&ctx, request, &response);
if (message == kOkMessage) {
ASSERT_EQ(status.error_code(), StatusCode::OK);
ASSERT_EQ(request.message(), response.message());
} else if (message == kServerErrorMessage) {
ASSERT_EQ(status.error_code(), StatusCode::UNKNOWN);
} else if (message == kClientErrorMessage) {
ASSERT_EQ(status.error_code(), StatusCode::FAILED_PRECONDITION);
}
}
}
grpc::string server_address_;
std::unique_ptr<Server> server_;
std::thread server_thread_;
EchoTestServiceImpl echo_service_;
};
TEST_F(ServerLoadReportingEnd2endTest, NoCall) {}
TEST_F(ServerLoadReportingEnd2endTest, BasicReport) {
auto channel =
grpc::CreateChannel(server_address_, InsecureChannelCredentials());
auto stub = ::grpc::lb::v1::LoadReporter::NewStub(channel);
ClientContext ctx;
auto stream = stub->ReportLoad(&ctx);
::grpc::lb::v1::LoadReportRequest request;
request.mutable_initial_request()->set_load_balanced_hostname(
server_address_);
request.mutable_initial_request()->set_load_key("LOAD_KEY");
request.mutable_initial_request()
->mutable_load_report_interval()
->set_seconds(5);
stream->Write(request);
gpr_log(GPR_INFO, "Initial request sent.");
::grpc::lb::v1::LoadReportResponse response;
stream->Read(&response);
const grpc::string& lb_id = response.initial_response().load_balancer_id();
gpr_log(GPR_INFO, "Initial response received (lb_id: %s).", lb_id.c_str());
ClientMakeEchoCalls(lb_id, "LB_TAG", kOkMessage, 1);
while (true) {
stream->Read(&response);
if (!response.load().empty()) {
ASSERT_EQ(response.load().size(), 3);
for (const auto& load : response.load()) {
if (load.in_progress_report_case()) {
// The special load record that reports the number of in-progress
// calls.
ASSERT_EQ(load.num_calls_in_progress(), 1);
} else if (load.orphaned_load_case()) {
// The call from the balancer doesn't have any valid LB token.
ASSERT_EQ(load.orphaned_load_case(), load.kLoadKeyUnknown);
ASSERT_EQ(load.num_calls_started(), 1);
ASSERT_EQ(load.num_calls_finished_without_error(), 0);
ASSERT_EQ(load.num_calls_finished_with_error(), 0);
} else {
// This corresponds to the calls from the client.
ASSERT_EQ(load.num_calls_started(), 1);
ASSERT_EQ(load.num_calls_finished_without_error(), 1);
ASSERT_EQ(load.num_calls_finished_with_error(), 0);
ASSERT_GE(load.total_bytes_received(), sizeof(kOkMessage));
ASSERT_GE(load.total_bytes_sent(), sizeof(kOkMessage));
ASSERT_EQ(load.metric_data().size(), 1);
ASSERT_EQ(load.metric_data().Get(0).metric_name(), kMetricName);
ASSERT_EQ(load.metric_data().Get(0).num_calls_finished_with_metric(),
1);
ASSERT_EQ(load.metric_data().Get(0).total_metric_value(),
kMetricValue);
}
}
break;
}
}
stream->WritesDone();
ASSERT_EQ(stream->Finish().error_code(), StatusCode::CANCELLED);
}
// TODO(juanlishen): Add more tests.
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -42,6 +42,7 @@ grpc_cc_test(
"//:gpr",
"//:grpc",
"//:lb_load_reporter",
"//:lb_server_load_reporting_filter",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],

@ -25,6 +25,7 @@
#include <grpc/grpc.h>
#include <gtest/gtest.h>
#include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/cpp/server/load_reporter/constants.h"
#include "src/cpp/server/load_reporter/load_reporter.h"
@ -123,6 +124,14 @@ class LoadReporterTest : public ::testing::Test {
private:
void SetUp() override {
// Access the measures to make them valid.
::grpc::load_reporter::MeasureStartCount();
::grpc::load_reporter::MeasureEndCount();
::grpc::load_reporter::MeasureEndBytesSent();
::grpc::load_reporter::MeasureEndBytesReceived();
::grpc::load_reporter::MeasureEndLatencyMs();
::grpc::load_reporter::MeasureOtherCallMetric();
// Set up the load reporter.
auto mock_cpu = new MockCpuStatsProvider();
auto mock_census = new MockCensusViewProvider();
// Prepare the initial CPU stats data. Note that the expectation should be

Loading…
Cancel
Save