server: per-rpc backend metric reporting (#29621)

Users can now report per-rpc metrics from servers to clients.
pull/29763/head
Nicolas Noble 3 years ago committed by GitHub
parent eed6711be9
commit 667691c499
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 77
      BUILD
  2. 11
      CMakeLists.txt
  3. 14
      build_autogenerated.yaml
  4. 4
      gRPC-C++.podspec
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 2
      grpc.gyp
  8. 94
      include/grpcpp/ext/call_metric_recorder.h
  9. 3
      include/grpcpp/impl/codegen/interceptor.h
  10. 18
      include/grpcpp/impl/codegen/server_context.h
  11. 5
      include/grpcpp/server_builder.h
  12. 1
      package.xml
  13. 12
      src/core/ext/filters/client_channel/backend_metric.cc
  14. 11
      src/core/ext/filters/client_channel/backend_metric.h
  15. 11
      src/core/ext/filters/client_channel/client_channel.cc
  16. 4
      src/core/ext/filters/client_channel/client_channel.h
  17. 22
      src/core/ext/filters/client_channel/lb_policy.h
  18. 49
      src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h
  19. 13
      src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.cc
  20. 5
      src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h
  21. 8
      src/core/lib/transport/metadata_batch.h
  22. 116
      src/cpp/server/orca/call_metric_recorder.cc
  23. 79
      src/cpp/server/orca/orca_interceptor.cc
  24. 49
      src/cpp/server/orca/orca_interceptor.h
  25. 10
      src/cpp/server/server_builder.cc
  26. 12
      src/cpp/server/server_context.cc
  27. 3
      test/core/util/test_lb_policies.cc
  28. 8
      test/core/util/test_lb_policies.h
  29. 5
      test/cpp/end2end/BUILD
  30. 29
      test/cpp/end2end/client_lb_end2end_test.cc
  31. 5
      test/cpp/end2end/server_interceptors_end2end_test.cc
  32. 1
      tools/doxygen/Doxyfile.c++
  33. 3
      tools/doxygen/Doxyfile.c++.internal
  34. 1
      tools/doxygen/Doxyfile.core.internal

77
BUILD

@ -2900,6 +2900,7 @@ grpc_cc_library(
"error",
"gpr_base",
"gpr_codegen",
"grpc_backend_metric_data",
"grpc_base",
"grpc_client_authority_filter",
"grpc_codegen",
@ -3246,6 +3247,20 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "grpc_backend_metric_data",
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h",
],
external_deps = [
"absl/strings",
],
language = "c++",
deps = [
"gpr",
],
)
grpc_cc_library(
name = "grpc_lb_policy_rls",
srcs = [
@ -5532,6 +5547,7 @@ grpc_cc_library(
public_hdrs = GRPCXX_PUBLIC_HDRS,
visibility = ["@grpc:alt_grpc++_base_legacy"],
deps = [
"arena",
"channel_init",
"config",
"gpr_base",
@ -5547,6 +5563,7 @@ grpc_cc_library(
"grpc_service_config_impl",
"grpc_trace",
"grpc_transport_inproc",
"grpcpp_call_metric_recorder",
"iomgr_timer",
"ref_counted",
"ref_counted_ptr",
@ -5574,6 +5591,7 @@ grpc_cc_library(
tags = ["avoid_dep"],
visibility = ["@grpc:alt_grpc++_base_unsecure_legacy"],
deps = [
"arena",
"channel_init",
"config",
"gpr_base",
@ -5590,6 +5608,7 @@ grpc_cc_library(
"grpc_trace",
"grpc_transport_inproc",
"grpc_unsecure",
"grpcpp_call_metric_recorder",
"iomgr_timer",
"ref_counted",
"ref_counted_ptr",
@ -5757,7 +5776,62 @@ grpc_cc_library(
)
grpc_cc_library(
name = "grpcpp_orca",
name = "grpcpp_call_metric_recorder",
srcs = [
"src/cpp/server/orca/call_metric_recorder.cc",
],
external_deps = [
"upb_lib",
"absl/memory",
"absl/strings",
"absl/types:optional",
],
language = "c++",
public_hdrs = [
"include/grpcpp/ext/call_metric_recorder.h",
],
visibility = ["@grpc:public"],
deps = [
"arena",
"grpc++_codegen_base",
"grpc++_internal_hdrs_only",
"grpc++_public_hdrs",
"grpc_backend_metric_data",
"xds_orca_upb",
],
)
grpc_cc_library(
name = "grpcpp_orca_interceptor",
srcs = [
"src/cpp/server/orca/orca_interceptor.cc",
],
hdrs = [
"src/cpp/server/orca/orca_interceptor.h",
],
external_deps = [
"upb_lib",
"absl/memory",
"absl/strings",
"absl/types:optional",
],
language = "c++",
visibility = ["@grpc:public"],
deps = [
"grpc++",
"grpc++_codegen_base",
"grpc_base",
"grpcpp_call_metric_recorder",
"protobuf_duration_upb",
"ref_counted",
"time",
"xds_orca_service_upb",
"xds_orca_upb",
],
)
grpc_cc_library(
name = "grpcpp_orca_service",
srcs = [
"src/cpp/server/orca/orca_service.cc",
],
@ -5766,6 +5840,7 @@ grpc_cc_library(
"absl/time",
"absl/types:optional",
"upb_lib",
"absl/memory",
],
language = "c++",
public_hdrs = [

11
CMakeLists.txt generated

@ -3041,6 +3041,7 @@ add_library(grpc++
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
src/cpp/server/insecure_server_credentials.cc
src/cpp/server/orca/call_metric_recorder.cc
src/cpp/server/secure_server_credentials.cc
src/cpp/server/server_builder.cc
src/cpp/server/server_callback.cc
@ -3191,6 +3192,7 @@ foreach(_hdr
include/grpcpp/create_channel.h
include/grpcpp/create_channel_binder.h
include/grpcpp/create_channel_posix.h
include/grpcpp/ext/call_metric_recorder.h
include/grpcpp/ext/health_check_service_server_builder_option.h
include/grpcpp/generic/async_generic_service.h
include/grpcpp/generic/generic_stub.h
@ -3713,6 +3715,7 @@ add_library(grpc++_unsecure
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
src/cpp/server/insecure_server_credentials.cc
src/cpp/server/orca/call_metric_recorder.cc
src/cpp/server/server_builder.cc
src/cpp/server/server_callback.cc
src/cpp/server/server_cc.cc
@ -3860,6 +3863,7 @@ foreach(_hdr
include/grpcpp/completion_queue.h
include/grpcpp/create_channel.h
include/grpcpp/create_channel_posix.h
include/grpcpp/ext/call_metric_recorder.h
include/grpcpp/ext/health_check_service_server_builder_option.h
include/grpcpp/generic/async_generic_service.h
include/grpcpp/generic/generic_stub.h
@ -8203,6 +8207,7 @@ add_executable(binder_transport_test
src/cpp/server/health/default_health_check_service.cc
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
src/cpp/server/orca/call_metric_recorder.cc
src/cpp/server/server_builder.cc
src/cpp/server/server_callback.cc
src/cpp/server/server_cc.cc
@ -9383,6 +9388,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h
src/cpp/server/orca/orca_interceptor.cc
src/cpp/server/orca/orca_service.cc
test/core/util/test_lb_policies.cc
test/cpp/end2end/client_lb_end2end_test.cc
@ -10110,6 +10116,7 @@ add_executable(endpoint_binder_pool_test
src/cpp/server/health/default_health_check_service.cc
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
src/cpp/server/orca/call_metric_recorder.cc
src/cpp/server/server_builder.cc
src/cpp/server/server_callback.cc
src/cpp/server/server_cc.cc
@ -10528,6 +10535,7 @@ add_executable(fake_binder_test
src/cpp/server/health/default_health_check_service.cc
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
src/cpp/server/orca/call_metric_recorder.cc
src/cpp/server/server_builder.cc
src/cpp/server/server_callback.cc
src/cpp/server/server_cc.cc
@ -16990,6 +16998,7 @@ add_executable(transport_stream_receiver_test
src/cpp/server/health/default_health_check_service.cc
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
src/cpp/server/orca/call_metric_recorder.cc
src/cpp/server/server_builder.cc
src/cpp/server/server_callback.cc
src/cpp/server/server_cc.cc
@ -17336,6 +17345,7 @@ add_executable(wire_reader_test
src/cpp/server/health/default_health_check_service.cc
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
src/cpp/server/orca/call_metric_recorder.cc
src/cpp/server/server_builder.cc
src/cpp/server/server_callback.cc
src/cpp/server/server_cc.cc
@ -17427,6 +17437,7 @@ add_executable(wire_writer_test
src/cpp/server/health/default_health_check_service.cc
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
src/cpp/server/orca/call_metric_recorder.cc
src/cpp/server/server_builder.cc
src/cpp/server/server_callback.cc
src/cpp/server/server_cc.cc

@ -330,6 +330,7 @@ libs:
- src/core/ext/filters/client_channel/http_proxy.h
- src/core/ext/filters/client_channel/lb_policy.h
- src/core/ext/filters/client_channel/lb_policy/address_filtering.h
- src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
@ -1804,6 +1805,7 @@ libs:
- src/core/ext/filters/client_channel/http_proxy.h
- src/core/ext/filters/client_channel/lb_policy.h
- src/core/ext/filters/client_channel/lb_policy/address_filtering.h
- src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
@ -2546,6 +2548,7 @@ libs:
- include/grpcpp/create_channel.h
- include/grpcpp/create_channel_binder.h
- include/grpcpp/create_channel_posix.h
- include/grpcpp/ext/call_metric_recorder.h
- include/grpcpp/ext/health_check_service_server_builder_option.h
- include/grpcpp/generic/async_generic_service.h
- include/grpcpp/generic/generic_stub.h
@ -2739,6 +2742,7 @@ libs:
- src/cpp/server/health/health_check_service.cc
- src/cpp/server/health/health_check_service_server_builder_option.cc
- src/cpp/server/insecure_server_credentials.cc
- src/cpp/server/orca/call_metric_recorder.cc
- src/cpp/server/secure_server_credentials.cc
- src/cpp/server/server_builder.cc
- src/cpp/server/server_callback.cc
@ -2945,6 +2949,7 @@ libs:
- include/grpcpp/completion_queue.h
- include/grpcpp/create_channel.h
- include/grpcpp/create_channel_posix.h
- include/grpcpp/ext/call_metric_recorder.h
- include/grpcpp/ext/health_check_service_server_builder_option.h
- include/grpcpp/generic/async_generic_service.h
- include/grpcpp/generic/generic_stub.h
@ -3086,6 +3091,7 @@ libs:
- src/cpp/server/health/health_check_service.cc
- src/cpp/server/health/health_check_service_server_builder_option.cc
- src/cpp/server/insecure_server_credentials.cc
- src/cpp/server/orca/call_metric_recorder.cc
- src/cpp/server/server_builder.cc
- src/cpp/server/server_callback.cc
- src/cpp/server/server_cc.cc
@ -4823,6 +4829,7 @@ targets:
- src/cpp/server/health/default_health_check_service.cc
- src/cpp/server/health/health_check_service.cc
- src/cpp/server/health/health_check_service_server_builder_option.cc
- src/cpp/server/orca/call_metric_recorder.cc
- src/cpp/server/server_builder.cc
- src/cpp/server/server_callback.cc
- src/cpp/server/server_cc.cc
@ -5259,6 +5266,7 @@ targets:
run: false
language: c++
headers:
- src/cpp/server/orca/orca_interceptor.h
- test/core/util/test_lb_policies.h
- test/cpp/end2end/connection_delay_injector.h
- test/cpp/end2end/test_service_impl.h
@ -5268,6 +5276,7 @@ targets:
- src/proto/grpc/testing/echo_messages.proto
- src/proto/grpc/testing/simple_messages.proto
- src/proto/grpc/testing/xds/v3/orca_load_report.proto
- src/cpp/server/orca/orca_interceptor.cc
- src/cpp/server/orca/orca_service.cc
- test/core/util/test_lb_policies.cc
- test/cpp/end2end/client_lb_end2end_test.cc
@ -5557,6 +5566,7 @@ targets:
- src/cpp/server/health/default_health_check_service.cc
- src/cpp/server/health/health_check_service.cc
- src/cpp/server/health/health_check_service_server_builder_option.cc
- src/cpp/server/orca/call_metric_recorder.cc
- src/cpp/server/server_builder.cc
- src/cpp/server/server_callback.cc
- src/cpp/server/server_cc.cc
@ -5781,6 +5791,7 @@ targets:
- src/cpp/server/health/default_health_check_service.cc
- src/cpp/server/health/health_check_service.cc
- src/cpp/server/health/health_check_service_server_builder_option.cc
- src/cpp/server/orca/call_metric_recorder.cc
- src/cpp/server/server_builder.cc
- src/cpp/server/server_callback.cc
- src/cpp/server/server_cc.cc
@ -8427,6 +8438,7 @@ targets:
- src/cpp/server/health/default_health_check_service.cc
- src/cpp/server/health/health_check_service.cc
- src/cpp/server/health/health_check_service_server_builder_option.cc
- src/cpp/server/orca/call_metric_recorder.cc
- src/cpp/server/server_builder.cc
- src/cpp/server/server_callback.cc
- src/cpp/server/server_cc.cc
@ -8619,6 +8631,7 @@ targets:
- src/cpp/server/health/default_health_check_service.cc
- src/cpp/server/health/health_check_service.cc
- src/cpp/server/health/health_check_service_server_builder_option.cc
- src/cpp/server/orca/call_metric_recorder.cc
- src/cpp/server/server_builder.cc
- src/cpp/server/server_callback.cc
- src/cpp/server/server_cc.cc
@ -8713,6 +8726,7 @@ targets:
- src/cpp/server/health/default_health_check_service.cc
- src/cpp/server/health/health_check_service.cc
- src/cpp/server/health/health_check_service_server_builder_option.cc
- src/cpp/server/orca/call_metric_recorder.cc
- src/cpp/server/server_builder.cc
- src/cpp/server/server_callback.cc
- src/cpp/server/server_cc.cc

4
gRPC-C++.podspec generated

@ -86,6 +86,7 @@ Pod::Spec.new do |s|
'include/grpcpp/create_channel.h',
'include/grpcpp/create_channel_binder.h',
'include/grpcpp/create_channel_posix.h',
'include/grpcpp/ext/call_metric_recorder.h',
'include/grpcpp/ext/health_check_service_server_builder_option.h',
'include/grpcpp/generic/async_generic_service.h',
'include/grpcpp/generic/generic_stub.h',
@ -233,6 +234,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/http_proxy.h',
'src/core/ext/filters/client_channel/lb_policy.h',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.h',
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
@ -987,6 +989,7 @@ Pod::Spec.new do |s|
'src/cpp/server/health/health_check_service.cc',
'src/cpp/server/health/health_check_service_server_builder_option.cc',
'src/cpp/server/insecure_server_credentials.cc',
'src/cpp/server/orca/call_metric_recorder.cc',
'src/cpp/server/secure_server_credentials.cc',
'src/cpp/server/secure_server_credentials.h',
'src/cpp/server/server_builder.cc',
@ -1065,6 +1068,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/http_proxy.h',
'src/core/ext/filters/client_channel/lb_policy.h',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.h',
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',

2
gRPC-Core.podspec generated

@ -229,6 +229,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy.h',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.cc',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.h',
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc',
@ -1686,6 +1687,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/http_proxy.h',
'src/core/ext/filters/client_channel/lb_policy.h',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.h',
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',

1
grpc.gemspec generated

@ -142,6 +142,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/lb_policy.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/address_filtering.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/address_filtering.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc )

2
grpc.gyp generated

@ -1511,6 +1511,7 @@
'src/cpp/server/health/health_check_service.cc',
'src/cpp/server/health/health_check_service_server_builder_option.cc',
'src/cpp/server/insecure_server_credentials.cc',
'src/cpp/server/orca/call_metric_recorder.cc',
'src/cpp/server/secure_server_credentials.cc',
'src/cpp/server/server_builder.cc',
'src/cpp/server/server_callback.cc',
@ -1636,6 +1637,7 @@
'src/cpp/server/health/health_check_service.cc',
'src/cpp/server/health/health_check_service_server_builder_option.cc',
'src/cpp/server/insecure_server_credentials.cc',
'src/cpp/server/orca/call_metric_recorder.cc',
'src/cpp/server/server_builder.cc',
'src/cpp/server/server_callback.cc',
'src/cpp/server/server_cc.cc',

@ -0,0 +1,94 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPCPP_EXT_CALL_METRIC_RECORDER_H
#define GRPCPP_EXT_CALL_METRIC_RECORDER_H
#include <memory>
#include <string>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpcpp/impl/codegen/slice.h>
#include <grpcpp/impl/codegen/sync.h>
namespace grpc_core {
class Arena;
struct BackendMetricData;
} // namespace grpc_core
namespace grpc {
class ServerBuilder;
namespace experimental {
class OrcaServerInterceptor;
// Registers the per-rpc orca load reporter into the \a ServerBuilder.
// Once this is done, the server will automatically send the load metrics
// after each RPC as they were reported. In order to report load metrics,
// call the \a ServerContext::ExperimentalGetCallMetricRecorder() method to
// retrieve the recorder for the current call.
void EnableCallMetricRecording(ServerBuilder*);
/// Records call metrics for the purpose of load balancing.
/// During an RPC, call \a ServerContext::ExperimentalGetCallMetricRecorder()
/// method to retrive the recorder for the current call.
class CallMetricRecorder {
public:
explicit CallMetricRecorder(grpc_core::Arena* arena);
~CallMetricRecorder();
/// Records a call metric measurement for CPU utilization.
/// Multiple calls to this method will override the stored value.
CallMetricRecorder& RecordCpuUtilizationMetric(double value);
/// Records a call metric measurement for memory utilization.
/// Multiple calls to this method will override the stored value.
CallMetricRecorder& RecordMemoryUtilizationMetric(double value);
/// Records a call metric measurement for utilization.
/// Multiple calls to this method with the same name will
/// override the corresponding stored value. The lifetime of the
/// name string needs to be longer than the lifetime of the RPC
/// itself, since it's going to be sent as trailers after the RPC
/// finishes. It is assumed the strings are common names that
/// are global constants.
CallMetricRecorder& RecordUtilizationMetric(string_ref name, double value);
/// Records a call metric measurement for request cost.
/// Multiple calls to this method with the same name will
/// override the corresponding stored value. The lifetime of the
/// name string needs to be longer than the lifetime of the RPC
/// itself, since it's going to be sent as trailers after the RPC
/// finishes. It is assumed the strings are common names that
/// are global constants.
CallMetricRecorder& RecordRequestCostMetric(string_ref name, double value);
private:
absl::optional<std::string> CreateSerializedReport();
internal::Mutex mu_;
grpc_core::BackendMetricData* backend_metric_data_ ABSL_GUARDED_BY(&mu_);
friend class experimental::OrcaServerInterceptor;
};
} // namespace experimental
} // namespace grpc
#endif // GRPCPP_EXT_CALL_METRIC_RECORDER_H

@ -21,13 +21,16 @@
// IWYU pragma: private, include <grpcpp/support/interceptor.h>
#include <map>
#include <memory>
#include <string>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpcpp/impl/codegen/byte_buffer.h>
#include <grpcpp/impl/codegen/config.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/codegen/metadata_map.h>
#include <grpcpp/impl/codegen/string_ref.h>
namespace grpc {

@ -117,6 +117,11 @@ class ServerContextTestSpouse;
class DefaultReactorTestPeer;
} // namespace testing
namespace experimental {
class OrcaServerInterceptor;
class CallMetricRecorder;
} // namespace experimental
/// Base class of ServerContext.
class ServerContextBase {
public:
@ -283,6 +288,15 @@ class ServerContextBase {
/// Applications never need to call this method.
grpc_call* c_call() { return call_.call; }
/// Get the \a CallMetricRecorder object for the current RPC.
/// Use it to record metrics during your RPC to send back to the
/// client in order to make load balancing decisions. This will
/// return nullptr if the feature hasn't been enabled using
/// \a EnableCallMetricRecording.
experimental::CallMetricRecorder* ExperimentalGetCallMetricRecorder() {
return call_metric_recorder_;
}
protected:
/// Async only. Has to be called before the rpc starts.
/// Returns the tag in completion queue when the rpc finishes.
@ -388,6 +402,7 @@ class ServerContextBase {
friend class grpc::ClientContext;
friend class grpc::GenericServerContext;
friend class grpc::GenericCallbackServerContext;
friend class grpc::experimental::OrcaServerInterceptor;
/// Prevent copying.
ServerContextBase(const ServerContextBase&);
@ -429,6 +444,8 @@ class ServerContextBase {
}
}
void CreateCallMetricRecorder();
struct CallWrapper {
~CallWrapper();
@ -466,6 +483,7 @@ class ServerContextBase {
grpc::experimental::ServerRpcInfo* rpc_info_ = nullptr;
RpcAllocatorState* message_allocator_state_ = nullptr;
ContextAllocator* context_allocator_ = nullptr;
experimental::CallMetricRecorder* call_metric_recorder_ = nullptr;
class Reactor : public grpc::ServerUnaryReactor {
public:

@ -59,6 +59,7 @@ class ExternalConnectionAcceptorImpl;
class CallbackGenericService;
namespace experimental {
class OrcaServerInterceptorFactory;
// EXPERIMENTAL API:
// Interface for a grpc server to build transports with connections created out
// of band.
@ -352,6 +353,7 @@ class ServerBuilder {
private:
friend class grpc::testing::ServerBuilderPluginTest;
friend class grpc::experimental::OrcaServerInterceptorFactory;
struct SyncServerSettings {
SyncServerSettings()
@ -402,6 +404,9 @@ class ServerBuilder {
std::vector<
std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
interceptor_creators_;
std::vector<
std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
internal_interceptor_creators_;
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors_;
grpc_server_config_fetcher* server_config_fetcher_ = nullptr;

1
package.xml generated

@ -124,6 +124,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/address_filtering.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/address_filtering.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc" role="src" />

@ -54,22 +54,20 @@ std::map<absl::string_view, double> ParseMap(
} // namespace
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
ParseBackendMetricData(absl::string_view serialized_load_report,
BackendMetricAllocatorInterface* allocator) {
const BackendMetricData* ParseBackendMetricData(
absl::string_view serialized_load_report,
BackendMetricAllocatorInterface* allocator) {
upb::Arena upb_arena;
xds_data_orca_v3_OrcaLoadReport* msg = xds_data_orca_v3_OrcaLoadReport_parse(
serialized_load_report.data(), serialized_load_report.size(),
upb_arena.ptr());
if (msg == nullptr) return nullptr;
LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
backend_metric_data = allocator->AllocateBackendMetricData();
BackendMetricData* backend_metric_data =
allocator->AllocateBackendMetricData();
backend_metric_data->cpu_utilization =
xds_data_orca_v3_OrcaLoadReport_cpu_utilization(msg);
backend_metric_data->mem_utilization =
xds_data_orca_v3_OrcaLoadReport_mem_utilization(msg);
backend_metric_data->requests_per_second =
xds_data_orca_v3_OrcaLoadReport_rps(msg);
backend_metric_data->request_cost =
ParseMap<xds_data_orca_v3_OrcaLoadReport_RequestCostEntry>(
msg, xds_data_orca_v3_OrcaLoadReport_request_cost_next,

@ -23,7 +23,7 @@
#include "absl/strings/string_view.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
namespace grpc_core {
@ -31,17 +31,16 @@ class BackendMetricAllocatorInterface {
public:
virtual ~BackendMetricAllocatorInterface() = default;
virtual LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
AllocateBackendMetricData() = 0;
virtual BackendMetricData* AllocateBackendMetricData() = 0;
virtual char* AllocateString(size_t size) = 0;
};
// Parses the serialized load report and populates out.
// Returns false on error.
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
ParseBackendMetricData(absl::string_view serialized_load_report,
BackendMetricAllocatorInterface* allocator);
const BackendMetricData* ParseBackendMetricData(
absl::string_view serialized_load_report,
BackendMetricAllocatorInterface* allocator);
} // namespace grpc_core

@ -2574,7 +2574,7 @@ class ClientChannel::LoadBalancedCall::BackendMetricAccessor
if (lb_call_->backend_metric_data_ == nullptr &&
lb_call_->recv_trailing_metadata_ != nullptr) {
if (const auto* md = lb_call_->recv_trailing_metadata_->get_pointer(
XEndpointLoadMetricsBinMetadata())) {
EndpointLoadMetricsBinMetadata())) {
BackendMetricAllocator allocator(lb_call_->arena_);
lb_call_->backend_metric_data_ =
ParseBackendMetricData(md->as_string_view(), &allocator);
@ -2588,10 +2588,8 @@ class ClientChannel::LoadBalancedCall::BackendMetricAccessor
public:
explicit BackendMetricAllocator(Arena* arena) : arena_(arena) {}
LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
AllocateBackendMetricData() override {
return arena_->New<
LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData>();
BackendMetricData* AllocateBackendMetricData() override {
return arena_->New<BackendMetricData>();
}
char* AllocateString(size_t size) override {
@ -2651,8 +2649,7 @@ ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
GRPC_ERROR_UNREF(cancel_error_);
GRPC_ERROR_UNREF(failure_error_);
if (backend_metric_data_ != nullptr) {
backend_metric_data_->LoadBalancingPolicy::BackendMetricAccessor::
BackendMetricData::~BackendMetricData();
backend_metric_data_->BackendMetricData::~BackendMetricData();
}
// Make sure there are no remaining pending batches.
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {

@ -40,6 +40,7 @@
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/dynamic_filters.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/channel/call_tracer.h"
@ -501,8 +502,7 @@ class ClientChannel::LoadBalancedCall
ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
backend_metric_data_ = nullptr;
const BackendMetricData* backend_metric_data_ = nullptr;
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
lb_subchannel_call_tracker_;

@ -22,7 +22,6 @@
#include <stddef.h>
#include <stdint.h>
#include <map>
#include <memory>
#include <string>
#include <type_traits>
@ -38,6 +37,7 @@
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/ext/filters/client_channel/subchannel_interface.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
@ -163,26 +163,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// SubchannelCallTrackerInterface.
class BackendMetricAccessor {
public:
// Represents backend metrics reported by the backend to the client.
struct BackendMetricData {
/// CPU utilization expressed as a fraction of available CPU resources.
double cpu_utilization;
/// Memory utilization expressed as a fraction of available memory
/// resources.
double mem_utilization;
/// Total requests per second being served by the backend. This
/// should include all services that a backend is responsible for.
uint64_t requests_per_second;
/// Application-specific requests cost metrics. Metric names are
/// determined by the application. Each value is an absolute cost
/// (e.g. 3487 bytes of storage) associated with the request.
std::map<absl::string_view, double> request_cost;
/// Application-specific resource utilization metrics. Metric names
/// are determined by the application. Each value is expressed as a
/// fraction of total resources available.
std::map<absl::string_view, double> utilization;
};
virtual ~BackendMetricAccessor() = default;
/// Returns the backend metric data returned by the server for the call,

@ -0,0 +1,49 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_BACKEND_METRIC_DATA_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_BACKEND_METRIC_DATA_H
#include <grpc/support/port_platform.h>
#include <map>
#include "absl/strings/string_view.h"
namespace grpc_core {
// Represents backend metrics reported by the backend to the client.
struct BackendMetricData {
/// CPU utilization expressed as a fraction of available CPU resources.
double cpu_utilization = -1;
/// Memory utilization expressed as a fraction of available memory
/// resources.
double mem_utilization = -1;
/// Application-specific requests cost metrics. Metric names are
/// determined by the application. Each value is an absolute cost
/// (e.g. 3487 bytes of storage) associated with the request.
std::map<absl::string_view, double> request_cost;
/// Application-specific resource utilization metrics. Metric names
/// are determined by the application. Each value is expressed as a
/// fraction of total resources available.
std::map<absl::string_view, double> utilization;
};
} // namespace grpc_core
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_BACKEND_METRIC_DATA_H

@ -103,9 +103,7 @@ class OrcaProducer : public Subchannel::DataProducerInterface {
void OnConnectivityStateChange(grpc_connectivity_state state);
// Called to notify watchers of a new backend metric report.
void NotifyWatchers(
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData&
backend_metric_data);
void NotifyWatchers(const BackendMetricData& backend_metric_data);
RefCountedPtr<Subchannel> subchannel_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
@ -247,8 +245,7 @@ class OrcaProducer::OrcaStreamEventHandler
explicit BackendMetricAllocator(WeakRefCountedPtr<OrcaProducer> producer)
: producer_(std::move(producer)) {}
LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
AllocateBackendMetricData() override {
BackendMetricData* AllocateBackendMetricData() override {
return &backend_metric_data_;
}
@ -274,8 +271,7 @@ class OrcaProducer::OrcaStreamEventHandler
}
WeakRefCountedPtr<OrcaProducer> producer_;
LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData
backend_metric_data_;
BackendMetricData backend_metric_data_;
std::vector<UniquePtr<char>> string_storage_;
grpc_closure closure_;
};
@ -354,8 +350,7 @@ void OrcaProducer::MaybeStartStreamLocked() {
}
void OrcaProducer::NotifyWatchers(
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData&
backend_metric_data) {
const BackendMetricData& backend_metric_data) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_orca_client_trace)) {
gpr_log(GPR_INFO, "OrcaProducer %p: reporting backend metrics to watchers",
this);

@ -21,7 +21,7 @@
#include <memory>
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/ext/filters/client_channel/subchannel_interface.h"
#include "src/core/lib/gprpp/time.h"
@ -45,8 +45,7 @@ class OobBackendMetricWatcher {
virtual ~OobBackendMetricWatcher() = default;
virtual void OnBackendMetricReport(
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData&
backend_metric_data) = 0;
const BackendMetricData& backend_metric_data) = 0;
};
std::unique_ptr<SubchannelInterface::DataWatcherInterface>

@ -228,10 +228,10 @@ struct HostMetadata : public SimpleSliceBasedMetadata {
static absl::string_view key() { return "host"; }
};
// x-endpoint-load-metrics-bin metadata trait.
struct XEndpointLoadMetricsBinMetadata : public SimpleSliceBasedMetadata {
// endpoint-load-metrics-bin metadata trait.
struct EndpointLoadMetricsBinMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static absl::string_view key() { return "x-endpoint-load-metrics-bin"; }
static absl::string_view key() { return "endpoint-load-metrics-bin"; }
};
// grpc-server-stats-bin metadata trait.
@ -1248,7 +1248,7 @@ using grpc_metadata_batch_base = grpc_core::MetadataMap<
grpc_core::GrpcTimeoutMetadata, grpc_core::GrpcPreviousRpcAttemptsMetadata,
grpc_core::GrpcRetryPushbackMsMetadata, grpc_core::UserAgentMetadata,
grpc_core::GrpcMessageMetadata, grpc_core::HostMetadata,
grpc_core::XEndpointLoadMetricsBinMetadata,
grpc_core::EndpointLoadMetricsBinMetadata,
grpc_core::GrpcServerStatsBinMetadata, grpc_core::GrpcTraceBinMetadata,
grpc_core::GrpcTagsBinMetadata, grpc_core::GrpcLbClientStatsMetadata,
grpc_core::LbCostBinMetadata, grpc_core::LbTokenMetadata,

@ -0,0 +1,116 @@
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <stddef.h>
#include <map>
#include <string>
#include <utility>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "upb/upb.h"
#include "upb/upb.hpp"
#include "xds/data/orca/v3/orca_load_report.upb.h"
#include <grpcpp/ext/call_metric_recorder.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/support/config.h>
#include <grpcpp/support/string_ref.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc {
namespace experimental {
CallMetricRecorder::CallMetricRecorder(grpc_core::Arena* arena)
: backend_metric_data_(arena->New<grpc_core::BackendMetricData>()) {}
CallMetricRecorder::~CallMetricRecorder() {
backend_metric_data_->~BackendMetricData();
}
CallMetricRecorder& CallMetricRecorder::RecordCpuUtilizationMetric(
double value) {
internal::MutexLock lock(&mu_);
backend_metric_data_->cpu_utilization = value;
return *this;
}
CallMetricRecorder& CallMetricRecorder::RecordMemoryUtilizationMetric(
double value) {
internal::MutexLock lock(&mu_);
backend_metric_data_->mem_utilization = value;
return *this;
}
CallMetricRecorder& CallMetricRecorder::RecordUtilizationMetric(
grpc::string_ref name, double value) {
internal::MutexLock lock(&mu_);
absl::string_view name_sv(name.data(), name.length());
backend_metric_data_->utilization[name_sv] = value;
return *this;
}
CallMetricRecorder& CallMetricRecorder::RecordRequestCostMetric(
grpc::string_ref name, double value) {
internal::MutexLock lock(&mu_);
absl::string_view name_sv(name.data(), name.length());
backend_metric_data_->request_cost[name_sv] = value;
return *this;
}
absl::optional<std::string> CallMetricRecorder::CreateSerializedReport() {
upb::Arena arena;
internal::MutexLock lock(&mu_);
bool has_data = backend_metric_data_->cpu_utilization != -1 ||
backend_metric_data_->mem_utilization != -1 ||
!backend_metric_data_->utilization.empty() ||
!backend_metric_data_->request_cost.empty();
if (!has_data) {
return absl::nullopt;
}
xds_data_orca_v3_OrcaLoadReport* response =
xds_data_orca_v3_OrcaLoadReport_new(arena.ptr());
if (backend_metric_data_->cpu_utilization != -1) {
xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization(
response, backend_metric_data_->cpu_utilization);
}
if (backend_metric_data_->mem_utilization != -1) {
xds_data_orca_v3_OrcaLoadReport_set_mem_utilization(
response, backend_metric_data_->mem_utilization);
}
for (const auto& p : backend_metric_data_->request_cost) {
xds_data_orca_v3_OrcaLoadReport_request_cost_set(
response,
upb_StringView_FromDataAndSize(p.first.data(), p.first.size()),
p.second, arena.ptr());
}
for (const auto& p : backend_metric_data_->utilization) {
xds_data_orca_v3_OrcaLoadReport_utilization_set(
response,
upb_StringView_FromDataAndSize(p.first.data(), p.first.size()),
p.second, arena.ptr());
}
size_t buf_length;
char* buf = xds_data_orca_v3_OrcaLoadReport_serialize(response, arena.ptr(),
&buf_length);
return std::string(buf, buf_length);
}
} // namespace experimental
} // namespace grpc

@ -0,0 +1,79 @@
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "src/cpp/server/orca/orca_interceptor.h"
#include <algorithm>
#include <map>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpcpp/ext/call_metric_recorder.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include <grpcpp/support/config.h>
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc {
namespace experimental {
void OrcaServerInterceptor::Intercept(InterceptorBatchMethods* methods) {
if (methods->QueryInterceptionHookPoint(
InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) {
auto context = info_->server_context();
context->CreateCallMetricRecorder();
} else if (methods->QueryInterceptionHookPoint(
InterceptionHookPoints::PRE_SEND_STATUS)) {
auto trailers = methods->GetSendTrailingMetadata();
if (trailers != nullptr) {
auto context = info_->server_context();
auto* recorder = context->call_metric_recorder_;
auto serialized = recorder->CreateSerializedReport();
if (serialized.has_value() && !serialized->empty()) {
std::string key =
std::string(grpc_core::EndpointLoadMetricsBinMetadata::key());
trailers->emplace(
std::make_pair(std::move(key), std::move(serialized.value())));
}
}
}
methods->Proceed();
}
Interceptor* OrcaServerInterceptorFactory::CreateServerInterceptor(
ServerRpcInfo* info) {
return new OrcaServerInterceptor(info);
}
void OrcaServerInterceptorFactory::Register(grpc::ServerBuilder* builder) {
builder->internal_interceptor_creators_.push_back(
absl::make_unique<OrcaServerInterceptorFactory>());
}
void EnableCallMetricRecording(grpc::ServerBuilder* builder) {
OrcaServerInterceptorFactory::Register(builder);
}
} // namespace experimental
} // namespace grpc

@ -0,0 +1,49 @@
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_INTERNAL_CPP_ORCA_ORCA_INTERCEPTOR_H
#define GRPC_INTERNAL_CPP_ORCA_ORCA_INTERCEPTOR_H
#include <grpcpp/support/interceptor.h>
#include <grpcpp/support/server_interceptor.h>
namespace grpc {
class ServerBuilder;
namespace experimental {
class ServerRpcInfo;
class OrcaServerInterceptor : public Interceptor {
public:
explicit OrcaServerInterceptor(ServerRpcInfo* info) : info_(info) {}
void Intercept(InterceptorBatchMethods* methods) override;
private:
ServerRpcInfo* info_;
};
class OrcaServerInterceptorFactory : public ServerInterceptorFactoryInterface {
public:
static void Register(ServerBuilder* builder);
Interceptor* CreateServerInterceptor(ServerRpcInfo* info) override;
};
} // namespace experimental
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_ORCA_ORCA_INTERCEPTOR_H

@ -23,6 +23,7 @@
#include <string.h>
#include <algorithm>
#include <iterator>
#include <memory>
#include <string>
#include <utility>
@ -358,11 +359,18 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
gpr_log(GPR_INFO, "Callback server.");
}
// Merge the application and internal interceptors together.
// Internal interceptors go first.
auto creators = std::move(internal_interceptor_creators_);
creators.insert(creators.end(),
std::make_move_iterator(interceptor_creators_.begin()),
std::make_move_iterator(interceptor_creators_.end()));
std::unique_ptr<grpc::Server> server(new grpc::Server(
&args, sync_server_cqs, sync_server_settings_.min_pollers,
sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec,
std::move(acceptors_), server_config_fetcher_, resource_quota_,
std::move(interceptor_creators_)));
std::move(creators)));
ServerInitializer* initializer = server->initializer();

@ -38,6 +38,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/ext/call_metric_recorder.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/codegen/call_op_set.h>
#include <grpcpp/impl/codegen/call_op_set_interface.h>
@ -54,6 +55,8 @@
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/surface/call.h"
namespace grpc {
@ -282,6 +285,9 @@ ServerContextBase::~ServerContextBase() {
if (default_reactor_used_.load(std::memory_order_relaxed)) {
reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
}
if (call_metric_recorder_ != nullptr) {
call_metric_recorder_->~CallMetricRecorder();
}
}
ServerContextBase::CallWrapper::~CallWrapper() {
@ -395,4 +401,10 @@ void ServerContextBase::SetLoadReportingCosts(
}
}
void ServerContextBase::CreateCallMetricRecorder() {
GPR_ASSERT(call_metric_recorder_ == nullptr);
grpc_core::Arena* arena = grpc_call_get_arena(call_.call);
call_metric_recorder_ = arena->New<experimental::CallMetricRecorder>(arena);
}
} // namespace grpc

@ -565,8 +565,7 @@ class OobBackendMetricTestLoadBalancingPolicy
: address_(std::move(address)), parent_(std::move(parent)) {}
void OnBackendMetricReport(
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData&
backend_metric_data) override {
const BackendMetricData& backend_metric_data) override {
parent_->cb_(address_, backend_metric_data);
}

@ -37,8 +37,7 @@ void RegisterTestPickArgsLoadBalancingPolicy(
struct TrailingMetadataArgsSeen {
absl::Status status;
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
backend_metric_data;
const BackendMetricData* backend_metric_data;
MetadataVector metadata;
};
@ -60,9 +59,8 @@ void RegisterAddressTestLoadBalancingPolicy(AddressTestCallback cb);
// single subchannel whose address is in its configuration.
void RegisterFixedAddressLoadBalancingPolicy();
using OobBackendMetricCallback = std::function<void(
ServerAddress,
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData&)>;
using OobBackendMetricCallback =
std::function<void(ServerAddress, const BackendMetricData&)>;
// Registers an LB policy called "oob_backend_metric_test_lb" that invokes
// cb for each OOB backend metric report on each subchannel.

@ -493,7 +493,8 @@ grpc_cc_test(
"//:gpr",
"//:grpc",
"//:grpc++",
"//:grpcpp_orca",
"//:grpcpp_orca_interceptor",
"//:grpcpp_orca_service",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
@ -938,7 +939,7 @@ grpc_cc_test(
],
deps = [
"//:grpc++",
"//:grpcpp_orca",
"//:grpcpp_orca_service",
"//src/proto/grpc/testing/xds/v3:orca_service_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",

@ -37,6 +37,7 @@
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/ext/call_metric_recorder.h>
#include <grpcpp/ext/orca_service.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/impl/codegen/sync.h>
@ -92,11 +93,17 @@ class MyTestServiceImpl : public TestServiceImpl {
}
AddClient(context->peer());
if (request->has_param() && request->param().has_backend_metrics()) {
const auto& load_report = request->param().backend_metrics();
// TODO(roth): Once we provide a more standard server-side API for
// populating this data, use that API here.
context->AddTrailingMetadata("x-endpoint-load-metrics-bin",
load_report.SerializeAsString());
load_report_ = request->param().backend_metrics();
auto* recorder = context->ExperimentalGetCallMetricRecorder();
EXPECT_NE(recorder, nullptr);
recorder->RecordCpuUtilizationMetric(load_report_.cpu_utilization())
.RecordMemoryUtilizationMetric(load_report_.mem_utilization());
for (const auto& p : load_report_.request_cost()) {
recorder->RecordRequestCostMetric(p.first, p.second);
}
for (const auto& p : load_report_.utilization()) {
recorder->RecordUtilizationMetric(p.first, p.second);
}
}
return TestServiceImpl::Echo(context, request, response);
}
@ -126,6 +133,8 @@ class MyTestServiceImpl : public TestServiceImpl {
int request_count_ = 0;
grpc::internal::Mutex clients_mu_;
std::set<std::string> clients_;
// For strings storage.
xds::data::orca::v3::OrcaLoadReport load_report_;
};
class FakeResolverResponseGeneratorWrapper {
@ -374,6 +383,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
std::ostringstream server_address;
server_address << server_host << ":" << port_;
ServerBuilder builder;
experimental::EnableCallMetricRecording(&builder);
std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
grpc_fake_transport_security_server_credentials_create()));
builder.AddListeningPort(server_address.str(), std::move(creds));
@ -1940,12 +1950,10 @@ TEST_F(ClientLbPickArgsTest, Basic) {
//
xds::data::orca::v3::OrcaLoadReport BackendMetricDataToOrcaLoadReport(
const grpc_core::LoadBalancingPolicy::BackendMetricAccessor::
BackendMetricData& backend_metric_data) {
const grpc_core::BackendMetricData& backend_metric_data) {
xds::data::orca::v3::OrcaLoadReport load_report;
load_report.set_cpu_utilization(backend_metric_data.cpu_utilization);
load_report.set_mem_utilization(backend_metric_data.mem_utilization);
load_report.set_rps(backend_metric_data.requests_per_second);
for (const auto& p : backend_metric_data.request_cost) {
std::string name(p.first);
(*load_report.mutable_request_cost())[name] = p.second;
@ -2154,7 +2162,6 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
xds::data::orca::v3::OrcaLoadReport load_report;
load_report.set_cpu_utilization(0.5);
load_report.set_mem_utilization(0.75);
load_report.set_rps(25);
auto* request_cost = load_report.mutable_request_cost();
(*request_cost)["foo"] = 0.8;
(*request_cost)["bar"] = 1.4;
@ -2174,7 +2181,6 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
// available in OSS.
EXPECT_EQ(actual->cpu_utilization(), load_report.cpu_utilization());
EXPECT_EQ(actual->mem_utilization(), load_report.mem_utilization());
EXPECT_EQ(actual->rps(), load_report.rps());
EXPECT_EQ(actual->request_cost().size(), load_report.request_cost().size());
for (const auto& p : actual->request_cost()) {
auto it = load_report.request_cost().find(p.first);
@ -2309,8 +2315,7 @@ class OobBackendMetricTest : public ClientLbEnd2endTest {
private:
static void BackendMetricCallback(
grpc_core::ServerAddress address,
const grpc_core::LoadBalancingPolicy::BackendMetricAccessor::
BackendMetricData& backend_metric_data) {
const grpc_core::BackendMetricData& backend_metric_data) {
auto load_report = BackendMetricDataToOrcaLoadReport(backend_metric_data);
int port = grpc_sockaddr_get_port(&address.address());
grpc::internal::MutexLock lock(&current_test_instance_->mu_);

@ -48,8 +48,6 @@ namespace {
class LoggingInterceptor : public experimental::Interceptor {
public:
explicit LoggingInterceptor(experimental::ServerRpcInfo* info) {
info_ = info;
// Check the method name and compare to the type
const char* method = info->method();
experimental::ServerRpcInfo::Type type = info->type();
@ -133,9 +131,6 @@ class LoggingInterceptor : public experimental::Interceptor {
}
methods->Proceed();
}
private:
experimental::ServerRpcInfo* info_;
};
class LoggingInterceptorFactory

@ -948,6 +948,7 @@ include/grpcpp/completion_queue.h \
include/grpcpp/create_channel.h \
include/grpcpp/create_channel_binder.h \
include/grpcpp/create_channel_posix.h \
include/grpcpp/ext/call_metric_recorder.h \
include/grpcpp/ext/health_check_service_server_builder_option.h \
include/grpcpp/generic/async_generic_service.h \
include/grpcpp/generic/generic_stub.h \

@ -948,6 +948,7 @@ include/grpcpp/completion_queue.h \
include/grpcpp/create_channel.h \
include/grpcpp/create_channel_binder.h \
include/grpcpp/create_channel_posix.h \
include/grpcpp/ext/call_metric_recorder.h \
include/grpcpp/ext/health_check_service_server_builder_option.h \
include/grpcpp/generic/async_generic_service.h \
include/grpcpp/generic/generic_stub.h \
@ -1086,6 +1087,7 @@ src/core/ext/filters/client_channel/lb_policy.cc \
src/core/ext/filters/client_channel/lb_policy.h \
src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \
src/core/ext/filters/client_channel/lb_policy/address_filtering.h \
src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
@ -2529,6 +2531,7 @@ src/cpp/server/health/default_health_check_service.h \
src/cpp/server/health/health_check_service.cc \
src/cpp/server/health/health_check_service_server_builder_option.cc \
src/cpp/server/insecure_server_credentials.cc \
src/cpp/server/orca/call_metric_recorder.cc \
src/cpp/server/secure_server_credentials.cc \
src/cpp/server/secure_server_credentials.h \
src/cpp/server/server_builder.cc \

@ -906,6 +906,7 @@ src/core/ext/filters/client_channel/lb_policy.cc \
src/core/ext/filters/client_channel/lb_policy.h \
src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \
src/core/ext/filters/client_channel/lb_policy/address_filtering.h \
src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \

Loading…
Cancel
Save