diff --git a/BUILD b/BUILD index 94d85a795ac..2fec1a35e27 100644 --- a/BUILD +++ b/BUILD @@ -2900,6 +2900,7 @@ grpc_cc_library( "error", "gpr_base", "gpr_codegen", + "grpc_backend_metric_data", "grpc_base", "grpc_client_authority_filter", "grpc_codegen", @@ -3246,6 +3247,20 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "grpc_backend_metric_data", + hdrs = [ + "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h", + ], + external_deps = [ + "absl/strings", + ], + language = "c++", + deps = [ + "gpr", + ], +) + grpc_cc_library( name = "grpc_lb_policy_rls", srcs = [ @@ -5532,6 +5547,7 @@ grpc_cc_library( public_hdrs = GRPCXX_PUBLIC_HDRS, visibility = ["@grpc:alt_grpc++_base_legacy"], deps = [ + "arena", "channel_init", "config", "gpr_base", @@ -5547,6 +5563,7 @@ grpc_cc_library( "grpc_service_config_impl", "grpc_trace", "grpc_transport_inproc", + "grpcpp_call_metric_recorder", "iomgr_timer", "ref_counted", "ref_counted_ptr", @@ -5574,6 +5591,7 @@ grpc_cc_library( tags = ["avoid_dep"], visibility = ["@grpc:alt_grpc++_base_unsecure_legacy"], deps = [ + "arena", "channel_init", "config", "gpr_base", @@ -5590,6 +5608,7 @@ grpc_cc_library( "grpc_trace", "grpc_transport_inproc", "grpc_unsecure", + "grpcpp_call_metric_recorder", "iomgr_timer", "ref_counted", "ref_counted_ptr", @@ -5757,7 +5776,62 @@ grpc_cc_library( ) grpc_cc_library( - name = "grpcpp_orca", + name = "grpcpp_call_metric_recorder", + srcs = [ + "src/cpp/server/orca/call_metric_recorder.cc", + ], + external_deps = [ + "upb_lib", + "absl/memory", + "absl/strings", + "absl/types:optional", + ], + language = "c++", + public_hdrs = [ + "include/grpcpp/ext/call_metric_recorder.h", + ], + visibility = ["@grpc:public"], + deps = [ + "arena", + "grpc++_codegen_base", + "grpc++_internal_hdrs_only", + "grpc++_public_hdrs", + "grpc_backend_metric_data", + "xds_orca_upb", + ], +) + +grpc_cc_library( + name = "grpcpp_orca_interceptor", + srcs = [ + "src/cpp/server/orca/orca_interceptor.cc", + ], + hdrs = [ + "src/cpp/server/orca/orca_interceptor.h", + ], + external_deps = [ + "upb_lib", + "absl/memory", + "absl/strings", + "absl/types:optional", + ], + language = "c++", + visibility = ["@grpc:public"], + deps = [ + "grpc++", + "grpc++_codegen_base", + "grpc_base", + "grpcpp_call_metric_recorder", + "protobuf_duration_upb", + "ref_counted", + "time", + "xds_orca_service_upb", + "xds_orca_upb", + ], +) + +grpc_cc_library( + name = "grpcpp_orca_service", srcs = [ "src/cpp/server/orca/orca_service.cc", ], @@ -5766,6 +5840,7 @@ grpc_cc_library( "absl/time", "absl/types:optional", "upb_lib", + "absl/memory", ], language = "c++", public_hdrs = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 49387350f22..a4c382fbd8c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3041,6 +3041,7 @@ add_library(grpc++ src/cpp/server/health/health_check_service.cc src/cpp/server/health/health_check_service_server_builder_option.cc src/cpp/server/insecure_server_credentials.cc + src/cpp/server/orca/call_metric_recorder.cc src/cpp/server/secure_server_credentials.cc src/cpp/server/server_builder.cc src/cpp/server/server_callback.cc @@ -3191,6 +3192,7 @@ foreach(_hdr include/grpcpp/create_channel.h include/grpcpp/create_channel_binder.h include/grpcpp/create_channel_posix.h + include/grpcpp/ext/call_metric_recorder.h include/grpcpp/ext/health_check_service_server_builder_option.h include/grpcpp/generic/async_generic_service.h include/grpcpp/generic/generic_stub.h @@ -3713,6 +3715,7 @@ add_library(grpc++_unsecure src/cpp/server/health/health_check_service.cc src/cpp/server/health/health_check_service_server_builder_option.cc src/cpp/server/insecure_server_credentials.cc + src/cpp/server/orca/call_metric_recorder.cc src/cpp/server/server_builder.cc src/cpp/server/server_callback.cc src/cpp/server/server_cc.cc @@ -3860,6 +3863,7 @@ foreach(_hdr include/grpcpp/completion_queue.h include/grpcpp/create_channel.h include/grpcpp/create_channel_posix.h + include/grpcpp/ext/call_metric_recorder.h include/grpcpp/ext/health_check_service_server_builder_option.h include/grpcpp/generic/async_generic_service.h include/grpcpp/generic/generic_stub.h @@ -8203,6 +8207,7 @@ add_executable(binder_transport_test src/cpp/server/health/default_health_check_service.cc src/cpp/server/health/health_check_service.cc src/cpp/server/health/health_check_service_server_builder_option.cc + src/cpp/server/orca/call_metric_recorder.cc src/cpp/server/server_builder.cc src/cpp/server/server_callback.cc src/cpp/server/server_cc.cc @@ -9383,6 +9388,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h + src/cpp/server/orca/orca_interceptor.cc src/cpp/server/orca/orca_service.cc test/core/util/test_lb_policies.cc test/cpp/end2end/client_lb_end2end_test.cc @@ -10110,6 +10116,7 @@ add_executable(endpoint_binder_pool_test src/cpp/server/health/default_health_check_service.cc src/cpp/server/health/health_check_service.cc src/cpp/server/health/health_check_service_server_builder_option.cc + src/cpp/server/orca/call_metric_recorder.cc src/cpp/server/server_builder.cc src/cpp/server/server_callback.cc src/cpp/server/server_cc.cc @@ -10528,6 +10535,7 @@ add_executable(fake_binder_test src/cpp/server/health/default_health_check_service.cc src/cpp/server/health/health_check_service.cc src/cpp/server/health/health_check_service_server_builder_option.cc + src/cpp/server/orca/call_metric_recorder.cc src/cpp/server/server_builder.cc src/cpp/server/server_callback.cc src/cpp/server/server_cc.cc @@ -16990,6 +16998,7 @@ add_executable(transport_stream_receiver_test src/cpp/server/health/default_health_check_service.cc src/cpp/server/health/health_check_service.cc src/cpp/server/health/health_check_service_server_builder_option.cc + src/cpp/server/orca/call_metric_recorder.cc src/cpp/server/server_builder.cc src/cpp/server/server_callback.cc src/cpp/server/server_cc.cc @@ -17336,6 +17345,7 @@ add_executable(wire_reader_test src/cpp/server/health/default_health_check_service.cc src/cpp/server/health/health_check_service.cc src/cpp/server/health/health_check_service_server_builder_option.cc + src/cpp/server/orca/call_metric_recorder.cc src/cpp/server/server_builder.cc src/cpp/server/server_callback.cc src/cpp/server/server_cc.cc @@ -17427,6 +17437,7 @@ add_executable(wire_writer_test src/cpp/server/health/default_health_check_service.cc src/cpp/server/health/health_check_service.cc src/cpp/server/health/health_check_service_server_builder_option.cc + src/cpp/server/orca/call_metric_recorder.cc src/cpp/server/server_builder.cc src/cpp/server/server_callback.cc src/cpp/server/server_cc.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 7eca2fa9a2c..10442ce6c36 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -330,6 +330,7 @@ libs: - src/core/ext/filters/client_channel/http_proxy.h - src/core/ext/filters/client_channel/lb_policy.h - src/core/ext/filters/client_channel/lb_policy/address_filtering.h + - src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h - src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h @@ -1804,6 +1805,7 @@ libs: - src/core/ext/filters/client_channel/http_proxy.h - src/core/ext/filters/client_channel/lb_policy.h - src/core/ext/filters/client_channel/lb_policy/address_filtering.h + - src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h - src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h @@ -2546,6 +2548,7 @@ libs: - include/grpcpp/create_channel.h - include/grpcpp/create_channel_binder.h - include/grpcpp/create_channel_posix.h + - include/grpcpp/ext/call_metric_recorder.h - include/grpcpp/ext/health_check_service_server_builder_option.h - include/grpcpp/generic/async_generic_service.h - include/grpcpp/generic/generic_stub.h @@ -2739,6 +2742,7 @@ libs: - src/cpp/server/health/health_check_service.cc - src/cpp/server/health/health_check_service_server_builder_option.cc - src/cpp/server/insecure_server_credentials.cc + - src/cpp/server/orca/call_metric_recorder.cc - src/cpp/server/secure_server_credentials.cc - src/cpp/server/server_builder.cc - src/cpp/server/server_callback.cc @@ -2945,6 +2949,7 @@ libs: - include/grpcpp/completion_queue.h - include/grpcpp/create_channel.h - include/grpcpp/create_channel_posix.h + - include/grpcpp/ext/call_metric_recorder.h - include/grpcpp/ext/health_check_service_server_builder_option.h - include/grpcpp/generic/async_generic_service.h - include/grpcpp/generic/generic_stub.h @@ -3086,6 +3091,7 @@ libs: - src/cpp/server/health/health_check_service.cc - src/cpp/server/health/health_check_service_server_builder_option.cc - src/cpp/server/insecure_server_credentials.cc + - src/cpp/server/orca/call_metric_recorder.cc - src/cpp/server/server_builder.cc - src/cpp/server/server_callback.cc - src/cpp/server/server_cc.cc @@ -4823,6 +4829,7 @@ targets: - src/cpp/server/health/default_health_check_service.cc - src/cpp/server/health/health_check_service.cc - src/cpp/server/health/health_check_service_server_builder_option.cc + - src/cpp/server/orca/call_metric_recorder.cc - src/cpp/server/server_builder.cc - src/cpp/server/server_callback.cc - src/cpp/server/server_cc.cc @@ -5259,6 +5266,7 @@ targets: run: false language: c++ headers: + - src/cpp/server/orca/orca_interceptor.h - test/core/util/test_lb_policies.h - test/cpp/end2end/connection_delay_injector.h - test/cpp/end2end/test_service_impl.h @@ -5268,6 +5276,7 @@ targets: - src/proto/grpc/testing/echo_messages.proto - src/proto/grpc/testing/simple_messages.proto - src/proto/grpc/testing/xds/v3/orca_load_report.proto + - src/cpp/server/orca/orca_interceptor.cc - src/cpp/server/orca/orca_service.cc - test/core/util/test_lb_policies.cc - test/cpp/end2end/client_lb_end2end_test.cc @@ -5557,6 +5566,7 @@ targets: - src/cpp/server/health/default_health_check_service.cc - src/cpp/server/health/health_check_service.cc - src/cpp/server/health/health_check_service_server_builder_option.cc + - src/cpp/server/orca/call_metric_recorder.cc - src/cpp/server/server_builder.cc - src/cpp/server/server_callback.cc - src/cpp/server/server_cc.cc @@ -5781,6 +5791,7 @@ targets: - src/cpp/server/health/default_health_check_service.cc - src/cpp/server/health/health_check_service.cc - src/cpp/server/health/health_check_service_server_builder_option.cc + - src/cpp/server/orca/call_metric_recorder.cc - src/cpp/server/server_builder.cc - src/cpp/server/server_callback.cc - src/cpp/server/server_cc.cc @@ -8427,6 +8438,7 @@ targets: - src/cpp/server/health/default_health_check_service.cc - src/cpp/server/health/health_check_service.cc - src/cpp/server/health/health_check_service_server_builder_option.cc + - src/cpp/server/orca/call_metric_recorder.cc - src/cpp/server/server_builder.cc - src/cpp/server/server_callback.cc - src/cpp/server/server_cc.cc @@ -8619,6 +8631,7 @@ targets: - src/cpp/server/health/default_health_check_service.cc - src/cpp/server/health/health_check_service.cc - src/cpp/server/health/health_check_service_server_builder_option.cc + - src/cpp/server/orca/call_metric_recorder.cc - src/cpp/server/server_builder.cc - src/cpp/server/server_callback.cc - src/cpp/server/server_cc.cc @@ -8713,6 +8726,7 @@ targets: - src/cpp/server/health/default_health_check_service.cc - src/cpp/server/health/health_check_service.cc - src/cpp/server/health/health_check_service_server_builder_option.cc + - src/cpp/server/orca/call_metric_recorder.cc - src/cpp/server/server_builder.cc - src/cpp/server/server_callback.cc - src/cpp/server/server_cc.cc diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 6223576771a..48deddb9da6 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -86,6 +86,7 @@ Pod::Spec.new do |s| 'include/grpcpp/create_channel.h', 'include/grpcpp/create_channel_binder.h', 'include/grpcpp/create_channel_posix.h', + 'include/grpcpp/ext/call_metric_recorder.h', 'include/grpcpp/ext/health_check_service_server_builder_option.h', 'include/grpcpp/generic/async_generic_service.h', 'include/grpcpp/generic/generic_stub.h', @@ -233,6 +234,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/http_proxy.h', 'src/core/ext/filters/client_channel/lb_policy.h', 'src/core/ext/filters/client_channel/lb_policy/address_filtering.h', + 'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', @@ -987,6 +989,7 @@ Pod::Spec.new do |s| 'src/cpp/server/health/health_check_service.cc', 'src/cpp/server/health/health_check_service_server_builder_option.cc', 'src/cpp/server/insecure_server_credentials.cc', + 'src/cpp/server/orca/call_metric_recorder.cc', 'src/cpp/server/secure_server_credentials.cc', 'src/cpp/server/secure_server_credentials.h', 'src/cpp/server/server_builder.cc', @@ -1065,6 +1068,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/http_proxy.h', 'src/core/ext/filters/client_channel/lb_policy.h', 'src/core/ext/filters/client_channel/lb_policy/address_filtering.h', + 'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index af6d75604fd..21dcdb429ab 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -229,6 +229,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy.h', 'src/core/ext/filters/client_channel/lb_policy/address_filtering.cc', 'src/core/ext/filters/client_channel/lb_policy/address_filtering.h', + 'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc', @@ -1686,6 +1687,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/http_proxy.h', 'src/core/ext/filters/client_channel/lb_policy.h', 'src/core/ext/filters/client_channel/lb_policy/address_filtering.h', + 'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', diff --git a/grpc.gemspec b/grpc.gemspec index 7ebde4197c5..44d9ebe8979 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -142,6 +142,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/lb_policy.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/address_filtering.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/address_filtering.h ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc ) diff --git a/grpc.gyp b/grpc.gyp index 1daffa43072..700a2370f6f 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -1511,6 +1511,7 @@ 'src/cpp/server/health/health_check_service.cc', 'src/cpp/server/health/health_check_service_server_builder_option.cc', 'src/cpp/server/insecure_server_credentials.cc', + 'src/cpp/server/orca/call_metric_recorder.cc', 'src/cpp/server/secure_server_credentials.cc', 'src/cpp/server/server_builder.cc', 'src/cpp/server/server_callback.cc', @@ -1636,6 +1637,7 @@ 'src/cpp/server/health/health_check_service.cc', 'src/cpp/server/health/health_check_service_server_builder_option.cc', 'src/cpp/server/insecure_server_credentials.cc', + 'src/cpp/server/orca/call_metric_recorder.cc', 'src/cpp/server/server_builder.cc', 'src/cpp/server/server_callback.cc', 'src/cpp/server/server_cc.cc', diff --git a/include/grpcpp/ext/call_metric_recorder.h b/include/grpcpp/ext/call_metric_recorder.h new file mode 100644 index 00000000000..9c526226dd3 --- /dev/null +++ b/include/grpcpp/ext/call_metric_recorder.h @@ -0,0 +1,94 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_EXT_CALL_METRIC_RECORDER_H +#define GRPCPP_EXT_CALL_METRIC_RECORDER_H + +#include +#include + +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" + +#include +#include + +namespace grpc_core { +class Arena; +struct BackendMetricData; +} // namespace grpc_core + +namespace grpc { +class ServerBuilder; + +namespace experimental { +class OrcaServerInterceptor; + +// Registers the per-rpc orca load reporter into the \a ServerBuilder. +// Once this is done, the server will automatically send the load metrics +// after each RPC as they were reported. In order to report load metrics, +// call the \a ServerContext::ExperimentalGetCallMetricRecorder() method to +// retrieve the recorder for the current call. +void EnableCallMetricRecording(ServerBuilder*); + +/// Records call metrics for the purpose of load balancing. +/// During an RPC, call \a ServerContext::ExperimentalGetCallMetricRecorder() +/// method to retrive the recorder for the current call. +class CallMetricRecorder { + public: + explicit CallMetricRecorder(grpc_core::Arena* arena); + ~CallMetricRecorder(); + + /// Records a call metric measurement for CPU utilization. + /// Multiple calls to this method will override the stored value. + CallMetricRecorder& RecordCpuUtilizationMetric(double value); + + /// Records a call metric measurement for memory utilization. + /// Multiple calls to this method will override the stored value. + CallMetricRecorder& RecordMemoryUtilizationMetric(double value); + + /// Records a call metric measurement for utilization. + /// Multiple calls to this method with the same name will + /// override the corresponding stored value. The lifetime of the + /// name string needs to be longer than the lifetime of the RPC + /// itself, since it's going to be sent as trailers after the RPC + /// finishes. It is assumed the strings are common names that + /// are global constants. + CallMetricRecorder& RecordUtilizationMetric(string_ref name, double value); + + /// Records a call metric measurement for request cost. + /// Multiple calls to this method with the same name will + /// override the corresponding stored value. The lifetime of the + /// name string needs to be longer than the lifetime of the RPC + /// itself, since it's going to be sent as trailers after the RPC + /// finishes. It is assumed the strings are common names that + /// are global constants. + CallMetricRecorder& RecordRequestCostMetric(string_ref name, double value); + + private: + absl::optional CreateSerializedReport(); + + internal::Mutex mu_; + grpc_core::BackendMetricData* backend_metric_data_ ABSL_GUARDED_BY(&mu_); + friend class experimental::OrcaServerInterceptor; +}; + +} // namespace experimental +} // namespace grpc + +#endif // GRPCPP_EXT_CALL_METRIC_RECORDER_H diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h index 8faea760a85..144505330d8 100644 --- a/include/grpcpp/impl/codegen/interceptor.h +++ b/include/grpcpp/impl/codegen/interceptor.h @@ -21,13 +21,16 @@ // IWYU pragma: private, include +#include #include +#include #include #include #include #include #include +#include namespace grpc { diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index 30da8a1eab0..c6e08c0239a 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -117,6 +117,11 @@ class ServerContextTestSpouse; class DefaultReactorTestPeer; } // namespace testing +namespace experimental { +class OrcaServerInterceptor; +class CallMetricRecorder; +} // namespace experimental + /// Base class of ServerContext. class ServerContextBase { public: @@ -283,6 +288,15 @@ class ServerContextBase { /// Applications never need to call this method. grpc_call* c_call() { return call_.call; } + /// Get the \a CallMetricRecorder object for the current RPC. + /// Use it to record metrics during your RPC to send back to the + /// client in order to make load balancing decisions. This will + /// return nullptr if the feature hasn't been enabled using + /// \a EnableCallMetricRecording. + experimental::CallMetricRecorder* ExperimentalGetCallMetricRecorder() { + return call_metric_recorder_; + } + protected: /// Async only. Has to be called before the rpc starts. /// Returns the tag in completion queue when the rpc finishes. @@ -388,6 +402,7 @@ class ServerContextBase { friend class grpc::ClientContext; friend class grpc::GenericServerContext; friend class grpc::GenericCallbackServerContext; + friend class grpc::experimental::OrcaServerInterceptor; /// Prevent copying. ServerContextBase(const ServerContextBase&); @@ -429,6 +444,8 @@ class ServerContextBase { } } + void CreateCallMetricRecorder(); + struct CallWrapper { ~CallWrapper(); @@ -466,6 +483,7 @@ class ServerContextBase { grpc::experimental::ServerRpcInfo* rpc_info_ = nullptr; RpcAllocatorState* message_allocator_state_ = nullptr; ContextAllocator* context_allocator_ = nullptr; + experimental::CallMetricRecorder* call_metric_recorder_ = nullptr; class Reactor : public grpc::ServerUnaryReactor { public: diff --git a/include/grpcpp/server_builder.h b/include/grpcpp/server_builder.h index e542ef2c5ad..2f72a872dfe 100644 --- a/include/grpcpp/server_builder.h +++ b/include/grpcpp/server_builder.h @@ -59,6 +59,7 @@ class ExternalConnectionAcceptorImpl; class CallbackGenericService; namespace experimental { +class OrcaServerInterceptorFactory; // EXPERIMENTAL API: // Interface for a grpc server to build transports with connections created out // of band. @@ -352,6 +353,7 @@ class ServerBuilder { private: friend class grpc::testing::ServerBuilderPluginTest; + friend class grpc::experimental::OrcaServerInterceptorFactory; struct SyncServerSettings { SyncServerSettings() @@ -402,6 +404,9 @@ class ServerBuilder { std::vector< std::unique_ptr> interceptor_creators_; + std::vector< + std::unique_ptr> + internal_interceptor_creators_; std::vector> acceptors_; grpc_server_config_fetcher* server_config_fetcher_ = nullptr; diff --git a/package.xml b/package.xml index 96e57d8b959..278ebf787fa 100644 --- a/package.xml +++ b/package.xml @@ -124,6 +124,7 @@ + diff --git a/src/core/ext/filters/client_channel/backend_metric.cc b/src/core/ext/filters/client_channel/backend_metric.cc index 99f77a6e259..101893c6809 100644 --- a/src/core/ext/filters/client_channel/backend_metric.cc +++ b/src/core/ext/filters/client_channel/backend_metric.cc @@ -54,22 +54,20 @@ std::map ParseMap( } // namespace -const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData* -ParseBackendMetricData(absl::string_view serialized_load_report, - BackendMetricAllocatorInterface* allocator) { +const BackendMetricData* ParseBackendMetricData( + absl::string_view serialized_load_report, + BackendMetricAllocatorInterface* allocator) { upb::Arena upb_arena; xds_data_orca_v3_OrcaLoadReport* msg = xds_data_orca_v3_OrcaLoadReport_parse( serialized_load_report.data(), serialized_load_report.size(), upb_arena.ptr()); if (msg == nullptr) return nullptr; - LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData* - backend_metric_data = allocator->AllocateBackendMetricData(); + BackendMetricData* backend_metric_data = + allocator->AllocateBackendMetricData(); backend_metric_data->cpu_utilization = xds_data_orca_v3_OrcaLoadReport_cpu_utilization(msg); backend_metric_data->mem_utilization = xds_data_orca_v3_OrcaLoadReport_mem_utilization(msg); - backend_metric_data->requests_per_second = - xds_data_orca_v3_OrcaLoadReport_rps(msg); backend_metric_data->request_cost = ParseMap( msg, xds_data_orca_v3_OrcaLoadReport_request_cost_next, diff --git a/src/core/ext/filters/client_channel/backend_metric.h b/src/core/ext/filters/client_channel/backend_metric.h index 9f8c7531b7f..b3f8db6d66f 100644 --- a/src/core/ext/filters/client_channel/backend_metric.h +++ b/src/core/ext/filters/client_channel/backend_metric.h @@ -23,7 +23,7 @@ #include "absl/strings/string_view.h" -#include "src/core/ext/filters/client_channel/lb_policy.h" +#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" namespace grpc_core { @@ -31,17 +31,16 @@ class BackendMetricAllocatorInterface { public: virtual ~BackendMetricAllocatorInterface() = default; - virtual LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData* - AllocateBackendMetricData() = 0; + virtual BackendMetricData* AllocateBackendMetricData() = 0; virtual char* AllocateString(size_t size) = 0; }; // Parses the serialized load report and populates out. // Returns false on error. -const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData* -ParseBackendMetricData(absl::string_view serialized_load_report, - BackendMetricAllocatorInterface* allocator); +const BackendMetricData* ParseBackendMetricData( + absl::string_view serialized_load_report, + BackendMetricAllocatorInterface* allocator); } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 899d560faf3..a8eb032800e 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -2574,7 +2574,7 @@ class ClientChannel::LoadBalancedCall::BackendMetricAccessor if (lb_call_->backend_metric_data_ == nullptr && lb_call_->recv_trailing_metadata_ != nullptr) { if (const auto* md = lb_call_->recv_trailing_metadata_->get_pointer( - XEndpointLoadMetricsBinMetadata())) { + EndpointLoadMetricsBinMetadata())) { BackendMetricAllocator allocator(lb_call_->arena_); lb_call_->backend_metric_data_ = ParseBackendMetricData(md->as_string_view(), &allocator); @@ -2588,10 +2588,8 @@ class ClientChannel::LoadBalancedCall::BackendMetricAccessor public: explicit BackendMetricAllocator(Arena* arena) : arena_(arena) {} - LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData* - AllocateBackendMetricData() override { - return arena_->New< - LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData>(); + BackendMetricData* AllocateBackendMetricData() override { + return arena_->New(); } char* AllocateString(size_t size) override { @@ -2651,8 +2649,7 @@ ClientChannel::LoadBalancedCall::~LoadBalancedCall() { GRPC_ERROR_UNREF(cancel_error_); GRPC_ERROR_UNREF(failure_error_); if (backend_metric_data_ != nullptr) { - backend_metric_data_->LoadBalancingPolicy::BackendMetricAccessor:: - BackendMetricData::~BackendMetricData(); + backend_metric_data_->BackendMetricData::~BackendMetricData(); } // Make sure there are no remaining pending batches. for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index bec8e5c1e01..a0838a24311 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -40,6 +40,7 @@ #include "src/core/ext/filters/client_channel/config_selector.h" #include "src/core/ext/filters/client_channel/dynamic_filters.h" #include "src/core/ext/filters/client_channel/lb_policy.h" +#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" #include "src/core/lib/channel/call_tracer.h" @@ -501,8 +502,7 @@ class ClientChannel::LoadBalancedCall ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr; RefCountedPtr connected_subchannel_; - const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData* - backend_metric_data_ = nullptr; + const BackendMetricData* backend_metric_data_ = nullptr; std::unique_ptr lb_subchannel_call_tracker_; diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index a6e52a2bdfe..3cd542b50f6 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -38,6 +37,7 @@ #include #include +#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" #include "src/core/ext/filters/client_channel/subchannel_interface.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" @@ -163,26 +163,6 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// SubchannelCallTrackerInterface. class BackendMetricAccessor { public: - // Represents backend metrics reported by the backend to the client. - struct BackendMetricData { - /// CPU utilization expressed as a fraction of available CPU resources. - double cpu_utilization; - /// Memory utilization expressed as a fraction of available memory - /// resources. - double mem_utilization; - /// Total requests per second being served by the backend. This - /// should include all services that a backend is responsible for. - uint64_t requests_per_second; - /// Application-specific requests cost metrics. Metric names are - /// determined by the application. Each value is an absolute cost - /// (e.g. 3487 bytes of storage) associated with the request. - std::map request_cost; - /// Application-specific resource utilization metrics. Metric names - /// are determined by the application. Each value is expressed as a - /// fraction of total resources available. - std::map utilization; - }; - virtual ~BackendMetricAccessor() = default; /// Returns the backend metric data returned by the server for the call, diff --git a/src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h b/src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h new file mode 100644 index 00000000000..6b5f156acd4 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h @@ -0,0 +1,49 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_BACKEND_METRIC_DATA_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_BACKEND_METRIC_DATA_H + +#include + +#include + +#include "absl/strings/string_view.h" + +namespace grpc_core { + +// Represents backend metrics reported by the backend to the client. +struct BackendMetricData { + /// CPU utilization expressed as a fraction of available CPU resources. + double cpu_utilization = -1; + /// Memory utilization expressed as a fraction of available memory + /// resources. + double mem_utilization = -1; + /// Application-specific requests cost metrics. Metric names are + /// determined by the application. Each value is an absolute cost + /// (e.g. 3487 bytes of storage) associated with the request. + std::map request_cost; + /// Application-specific resource utilization metrics. Metric names + /// are determined by the application. Each value is expressed as a + /// fraction of total resources available. + std::map utilization; +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_BACKEND_METRIC_DATA_H diff --git a/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.cc b/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.cc index 4e7be864ba2..e9b79473690 100644 --- a/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.cc +++ b/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.cc @@ -103,9 +103,7 @@ class OrcaProducer : public Subchannel::DataProducerInterface { void OnConnectivityStateChange(grpc_connectivity_state state); // Called to notify watchers of a new backend metric report. - void NotifyWatchers( - const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData& - backend_metric_data); + void NotifyWatchers(const BackendMetricData& backend_metric_data); RefCountedPtr subchannel_; RefCountedPtr connected_subchannel_; @@ -247,8 +245,7 @@ class OrcaProducer::OrcaStreamEventHandler explicit BackendMetricAllocator(WeakRefCountedPtr producer) : producer_(std::move(producer)) {} - LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData* - AllocateBackendMetricData() override { + BackendMetricData* AllocateBackendMetricData() override { return &backend_metric_data_; } @@ -274,8 +271,7 @@ class OrcaProducer::OrcaStreamEventHandler } WeakRefCountedPtr producer_; - LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData - backend_metric_data_; + BackendMetricData backend_metric_data_; std::vector> string_storage_; grpc_closure closure_; }; @@ -354,8 +350,7 @@ void OrcaProducer::MaybeStartStreamLocked() { } void OrcaProducer::NotifyWatchers( - const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData& - backend_metric_data) { + const BackendMetricData& backend_metric_data) { if (GRPC_TRACE_FLAG_ENABLED(grpc_orca_client_trace)) { gpr_log(GPR_INFO, "OrcaProducer %p: reporting backend metrics to watchers", this); diff --git a/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h b/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h index 6025b521040..3f4b6388bf2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h +++ b/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h @@ -21,7 +21,7 @@ #include -#include "src/core/ext/filters/client_channel/lb_policy.h" +#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" #include "src/core/ext/filters/client_channel/subchannel_interface.h" #include "src/core/lib/gprpp/time.h" @@ -45,8 +45,7 @@ class OobBackendMetricWatcher { virtual ~OobBackendMetricWatcher() = default; virtual void OnBackendMetricReport( - const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData& - backend_metric_data) = 0; + const BackendMetricData& backend_metric_data) = 0; }; std::unique_ptr diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h index f05f7af5ed8..afafa3378bb 100644 --- a/src/core/lib/transport/metadata_batch.h +++ b/src/core/lib/transport/metadata_batch.h @@ -228,10 +228,10 @@ struct HostMetadata : public SimpleSliceBasedMetadata { static absl::string_view key() { return "host"; } }; -// x-endpoint-load-metrics-bin metadata trait. -struct XEndpointLoadMetricsBinMetadata : public SimpleSliceBasedMetadata { +// endpoint-load-metrics-bin metadata trait. +struct EndpointLoadMetricsBinMetadata : public SimpleSliceBasedMetadata { static constexpr bool kRepeatable = false; - static absl::string_view key() { return "x-endpoint-load-metrics-bin"; } + static absl::string_view key() { return "endpoint-load-metrics-bin"; } }; // grpc-server-stats-bin metadata trait. @@ -1248,7 +1248,7 @@ using grpc_metadata_batch_base = grpc_core::MetadataMap< grpc_core::GrpcTimeoutMetadata, grpc_core::GrpcPreviousRpcAttemptsMetadata, grpc_core::GrpcRetryPushbackMsMetadata, grpc_core::UserAgentMetadata, grpc_core::GrpcMessageMetadata, grpc_core::HostMetadata, - grpc_core::XEndpointLoadMetricsBinMetadata, + grpc_core::EndpointLoadMetricsBinMetadata, grpc_core::GrpcServerStatsBinMetadata, grpc_core::GrpcTraceBinMetadata, grpc_core::GrpcTagsBinMetadata, grpc_core::GrpcLbClientStatsMetadata, grpc_core::LbCostBinMetadata, grpc_core::LbTokenMetadata, diff --git a/src/cpp/server/orca/call_metric_recorder.cc b/src/cpp/server/orca/call_metric_recorder.cc new file mode 100644 index 00000000000..7fffc9f38b1 --- /dev/null +++ b/src/cpp/server/orca/call_metric_recorder.cc @@ -0,0 +1,116 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include +#include +#include + +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "upb/upb.h" +#include "upb/upb.hpp" +#include "xds/data/orca/v3/orca_load_report.upb.h" + +#include +#include +#include +#include + +#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" +#include "src/core/lib/resource_quota/arena.h" + +namespace grpc { +namespace experimental { + +CallMetricRecorder::CallMetricRecorder(grpc_core::Arena* arena) + : backend_metric_data_(arena->New()) {} + +CallMetricRecorder::~CallMetricRecorder() { + backend_metric_data_->~BackendMetricData(); +} + +CallMetricRecorder& CallMetricRecorder::RecordCpuUtilizationMetric( + double value) { + internal::MutexLock lock(&mu_); + backend_metric_data_->cpu_utilization = value; + return *this; +} + +CallMetricRecorder& CallMetricRecorder::RecordMemoryUtilizationMetric( + double value) { + internal::MutexLock lock(&mu_); + backend_metric_data_->mem_utilization = value; + return *this; +} + +CallMetricRecorder& CallMetricRecorder::RecordUtilizationMetric( + grpc::string_ref name, double value) { + internal::MutexLock lock(&mu_); + absl::string_view name_sv(name.data(), name.length()); + backend_metric_data_->utilization[name_sv] = value; + return *this; +} + +CallMetricRecorder& CallMetricRecorder::RecordRequestCostMetric( + grpc::string_ref name, double value) { + internal::MutexLock lock(&mu_); + absl::string_view name_sv(name.data(), name.length()); + backend_metric_data_->request_cost[name_sv] = value; + return *this; +} + +absl::optional CallMetricRecorder::CreateSerializedReport() { + upb::Arena arena; + internal::MutexLock lock(&mu_); + bool has_data = backend_metric_data_->cpu_utilization != -1 || + backend_metric_data_->mem_utilization != -1 || + !backend_metric_data_->utilization.empty() || + !backend_metric_data_->request_cost.empty(); + if (!has_data) { + return absl::nullopt; + } + xds_data_orca_v3_OrcaLoadReport* response = + xds_data_orca_v3_OrcaLoadReport_new(arena.ptr()); + if (backend_metric_data_->cpu_utilization != -1) { + xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization( + response, backend_metric_data_->cpu_utilization); + } + if (backend_metric_data_->mem_utilization != -1) { + xds_data_orca_v3_OrcaLoadReport_set_mem_utilization( + response, backend_metric_data_->mem_utilization); + } + for (const auto& p : backend_metric_data_->request_cost) { + xds_data_orca_v3_OrcaLoadReport_request_cost_set( + response, + upb_StringView_FromDataAndSize(p.first.data(), p.first.size()), + p.second, arena.ptr()); + } + for (const auto& p : backend_metric_data_->utilization) { + xds_data_orca_v3_OrcaLoadReport_utilization_set( + response, + upb_StringView_FromDataAndSize(p.first.data(), p.first.size()), + p.second, arena.ptr()); + } + size_t buf_length; + char* buf = xds_data_orca_v3_OrcaLoadReport_serialize(response, arena.ptr(), + &buf_length); + return std::string(buf, buf_length); +} + +} // namespace experimental +} // namespace grpc diff --git a/src/cpp/server/orca/orca_interceptor.cc b/src/cpp/server/orca/orca_interceptor.cc new file mode 100644 index 00000000000..399f95c5696 --- /dev/null +++ b/src/cpp/server/orca/orca_interceptor.cc @@ -0,0 +1,79 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include "src/cpp/server/orca/orca_interceptor.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" + +#include +#include +#include +#include + +#include "src/core/lib/transport/metadata_batch.h" + +namespace grpc { +namespace experimental { + +void OrcaServerInterceptor::Intercept(InterceptorBatchMethods* methods) { + if (methods->QueryInterceptionHookPoint( + InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) { + auto context = info_->server_context(); + context->CreateCallMetricRecorder(); + } else if (methods->QueryInterceptionHookPoint( + InterceptionHookPoints::PRE_SEND_STATUS)) { + auto trailers = methods->GetSendTrailingMetadata(); + if (trailers != nullptr) { + auto context = info_->server_context(); + auto* recorder = context->call_metric_recorder_; + auto serialized = recorder->CreateSerializedReport(); + if (serialized.has_value() && !serialized->empty()) { + std::string key = + std::string(grpc_core::EndpointLoadMetricsBinMetadata::key()); + trailers->emplace( + std::make_pair(std::move(key), std::move(serialized.value()))); + } + } + } + methods->Proceed(); +} + +Interceptor* OrcaServerInterceptorFactory::CreateServerInterceptor( + ServerRpcInfo* info) { + return new OrcaServerInterceptor(info); +} + +void OrcaServerInterceptorFactory::Register(grpc::ServerBuilder* builder) { + builder->internal_interceptor_creators_.push_back( + absl::make_unique()); +} + +void EnableCallMetricRecording(grpc::ServerBuilder* builder) { + OrcaServerInterceptorFactory::Register(builder); +} + +} // namespace experimental +} // namespace grpc diff --git a/src/cpp/server/orca/orca_interceptor.h b/src/cpp/server/orca/orca_interceptor.h new file mode 100644 index 00000000000..40c8c13904a --- /dev/null +++ b/src/cpp/server/orca/orca_interceptor.h @@ -0,0 +1,49 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef GRPC_INTERNAL_CPP_ORCA_ORCA_INTERCEPTOR_H +#define GRPC_INTERNAL_CPP_ORCA_ORCA_INTERCEPTOR_H + +#include +#include + +namespace grpc { + +class ServerBuilder; + +namespace experimental { +class ServerRpcInfo; + +class OrcaServerInterceptor : public Interceptor { + public: + explicit OrcaServerInterceptor(ServerRpcInfo* info) : info_(info) {} + + void Intercept(InterceptorBatchMethods* methods) override; + + private: + ServerRpcInfo* info_; +}; + +class OrcaServerInterceptorFactory : public ServerInterceptorFactoryInterface { + public: + static void Register(ServerBuilder* builder); + Interceptor* CreateServerInterceptor(ServerRpcInfo* info) override; +}; + +} // namespace experimental +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_ORCA_ORCA_INTERCEPTOR_H diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 2d6de7d6903..b9b7c84202a 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -358,11 +359,18 @@ std::unique_ptr ServerBuilder::BuildAndStart() { gpr_log(GPR_INFO, "Callback server."); } + // Merge the application and internal interceptors together. + // Internal interceptors go first. + auto creators = std::move(internal_interceptor_creators_); + creators.insert(creators.end(), + std::make_move_iterator(interceptor_creators_.begin()), + std::make_move_iterator(interceptor_creators_.end())); + std::unique_ptr server(new grpc::Server( &args, sync_server_cqs, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec, std::move(acceptors_), server_config_fetcher_, resource_quota_, - std::move(interceptor_creators_))); + std::move(creators))); ServerInitializer* initializer = server->initializer(); diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 588227431f9..a5965203aed 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -54,6 +55,8 @@ #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/surface/call.h" namespace grpc { @@ -282,6 +285,9 @@ ServerContextBase::~ServerContextBase() { if (default_reactor_used_.load(std::memory_order_relaxed)) { reinterpret_cast(&default_reactor_)->~Reactor(); } + if (call_metric_recorder_ != nullptr) { + call_metric_recorder_->~CallMetricRecorder(); + } } ServerContextBase::CallWrapper::~CallWrapper() { @@ -395,4 +401,10 @@ void ServerContextBase::SetLoadReportingCosts( } } +void ServerContextBase::CreateCallMetricRecorder() { + GPR_ASSERT(call_metric_recorder_ == nullptr); + grpc_core::Arena* arena = grpc_call_get_arena(call_.call); + call_metric_recorder_ = arena->New(arena); +} + } // namespace grpc diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index efdd7b2377d..40c8c509d6f 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -565,8 +565,7 @@ class OobBackendMetricTestLoadBalancingPolicy : address_(std::move(address)), parent_(std::move(parent)) {} void OnBackendMetricReport( - const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData& - backend_metric_data) override { + const BackendMetricData& backend_metric_data) override { parent_->cb_(address_, backend_metric_data); } diff --git a/test/core/util/test_lb_policies.h b/test/core/util/test_lb_policies.h index 4c2f392ee76..d5815ef2921 100644 --- a/test/core/util/test_lb_policies.h +++ b/test/core/util/test_lb_policies.h @@ -37,8 +37,7 @@ void RegisterTestPickArgsLoadBalancingPolicy( struct TrailingMetadataArgsSeen { absl::Status status; - const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData* - backend_metric_data; + const BackendMetricData* backend_metric_data; MetadataVector metadata; }; @@ -60,9 +59,8 @@ void RegisterAddressTestLoadBalancingPolicy(AddressTestCallback cb); // single subchannel whose address is in its configuration. void RegisterFixedAddressLoadBalancingPolicy(); -using OobBackendMetricCallback = std::function; +using OobBackendMetricCallback = + std::function; // Registers an LB policy called "oob_backend_metric_test_lb" that invokes // cb for each OOB backend metric report on each subchannel. diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 7b4541fb707..5d2c4ef1e15 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -493,7 +493,8 @@ grpc_cc_test( "//:gpr", "//:grpc", "//:grpc++", - "//:grpcpp_orca", + "//:grpcpp_orca_interceptor", + "//:grpcpp_orca_service", "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", @@ -938,7 +939,7 @@ grpc_cc_test( ], deps = [ "//:grpc++", - "//:grpcpp_orca", + "//:grpcpp_orca_service", "//src/proto/grpc/testing/xds/v3:orca_service_proto", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index adbca7ac62c..a12dabf7e6b 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -92,11 +93,17 @@ class MyTestServiceImpl : public TestServiceImpl { } AddClient(context->peer()); if (request->has_param() && request->param().has_backend_metrics()) { - const auto& load_report = request->param().backend_metrics(); - // TODO(roth): Once we provide a more standard server-side API for - // populating this data, use that API here. - context->AddTrailingMetadata("x-endpoint-load-metrics-bin", - load_report.SerializeAsString()); + load_report_ = request->param().backend_metrics(); + auto* recorder = context->ExperimentalGetCallMetricRecorder(); + EXPECT_NE(recorder, nullptr); + recorder->RecordCpuUtilizationMetric(load_report_.cpu_utilization()) + .RecordMemoryUtilizationMetric(load_report_.mem_utilization()); + for (const auto& p : load_report_.request_cost()) { + recorder->RecordRequestCostMetric(p.first, p.second); + } + for (const auto& p : load_report_.utilization()) { + recorder->RecordUtilizationMetric(p.first, p.second); + } } return TestServiceImpl::Echo(context, request, response); } @@ -126,6 +133,8 @@ class MyTestServiceImpl : public TestServiceImpl { int request_count_ = 0; grpc::internal::Mutex clients_mu_; std::set clients_; + // For strings storage. + xds::data::orca::v3::OrcaLoadReport load_report_; }; class FakeResolverResponseGeneratorWrapper { @@ -374,6 +383,7 @@ class ClientLbEnd2endTest : public ::testing::Test { std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; + experimental::EnableCallMetricRecording(&builder); std::shared_ptr creds(new SecureServerCredentials( grpc_fake_transport_security_server_credentials_create())); builder.AddListeningPort(server_address.str(), std::move(creds)); @@ -1940,12 +1950,10 @@ TEST_F(ClientLbPickArgsTest, Basic) { // xds::data::orca::v3::OrcaLoadReport BackendMetricDataToOrcaLoadReport( - const grpc_core::LoadBalancingPolicy::BackendMetricAccessor:: - BackendMetricData& backend_metric_data) { + const grpc_core::BackendMetricData& backend_metric_data) { xds::data::orca::v3::OrcaLoadReport load_report; load_report.set_cpu_utilization(backend_metric_data.cpu_utilization); load_report.set_mem_utilization(backend_metric_data.mem_utilization); - load_report.set_rps(backend_metric_data.requests_per_second); for (const auto& p : backend_metric_data.request_cost) { std::string name(p.first); (*load_report.mutable_request_cost())[name] = p.second; @@ -2154,7 +2162,6 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) { xds::data::orca::v3::OrcaLoadReport load_report; load_report.set_cpu_utilization(0.5); load_report.set_mem_utilization(0.75); - load_report.set_rps(25); auto* request_cost = load_report.mutable_request_cost(); (*request_cost)["foo"] = 0.8; (*request_cost)["bar"] = 1.4; @@ -2174,7 +2181,6 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) { // available in OSS. EXPECT_EQ(actual->cpu_utilization(), load_report.cpu_utilization()); EXPECT_EQ(actual->mem_utilization(), load_report.mem_utilization()); - EXPECT_EQ(actual->rps(), load_report.rps()); EXPECT_EQ(actual->request_cost().size(), load_report.request_cost().size()); for (const auto& p : actual->request_cost()) { auto it = load_report.request_cost().find(p.first); @@ -2309,8 +2315,7 @@ class OobBackendMetricTest : public ClientLbEnd2endTest { private: static void BackendMetricCallback( grpc_core::ServerAddress address, - const grpc_core::LoadBalancingPolicy::BackendMetricAccessor:: - BackendMetricData& backend_metric_data) { + const grpc_core::BackendMetricData& backend_metric_data) { auto load_report = BackendMetricDataToOrcaLoadReport(backend_metric_data); int port = grpc_sockaddr_get_port(&address.address()); grpc::internal::MutexLock lock(¤t_test_instance_->mu_); diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc index 571d8820bba..0936398b2df 100644 --- a/test/cpp/end2end/server_interceptors_end2end_test.cc +++ b/test/cpp/end2end/server_interceptors_end2end_test.cc @@ -48,8 +48,6 @@ namespace { class LoggingInterceptor : public experimental::Interceptor { public: explicit LoggingInterceptor(experimental::ServerRpcInfo* info) { - info_ = info; - // Check the method name and compare to the type const char* method = info->method(); experimental::ServerRpcInfo::Type type = info->type(); @@ -133,9 +131,6 @@ class LoggingInterceptor : public experimental::Interceptor { } methods->Proceed(); } - - private: - experimental::ServerRpcInfo* info_; }; class LoggingInterceptorFactory diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 13b1fbfe357..bf3aff40492 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -948,6 +948,7 @@ include/grpcpp/completion_queue.h \ include/grpcpp/create_channel.h \ include/grpcpp/create_channel_binder.h \ include/grpcpp/create_channel_posix.h \ +include/grpcpp/ext/call_metric_recorder.h \ include/grpcpp/ext/health_check_service_server_builder_option.h \ include/grpcpp/generic/async_generic_service.h \ include/grpcpp/generic/generic_stub.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 61a2c6c5071..18095e02e3d 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -948,6 +948,7 @@ include/grpcpp/completion_queue.h \ include/grpcpp/create_channel.h \ include/grpcpp/create_channel_binder.h \ include/grpcpp/create_channel_posix.h \ +include/grpcpp/ext/call_metric_recorder.h \ include/grpcpp/ext/health_check_service_server_builder_option.h \ include/grpcpp/generic/async_generic_service.h \ include/grpcpp/generic/generic_stub.h \ @@ -1086,6 +1087,7 @@ src/core/ext/filters/client_channel/lb_policy.cc \ src/core/ext/filters/client_channel/lb_policy.h \ src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \ src/core/ext/filters/client_channel/lb_policy/address_filtering.h \ +src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h \ src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ @@ -2529,6 +2531,7 @@ src/cpp/server/health/default_health_check_service.h \ src/cpp/server/health/health_check_service.cc \ src/cpp/server/health/health_check_service_server_builder_option.cc \ src/cpp/server/insecure_server_credentials.cc \ +src/cpp/server/orca/call_metric_recorder.cc \ src/cpp/server/secure_server_credentials.cc \ src/cpp/server/secure_server_credentials.h \ src/cpp/server/server_builder.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index e5c35d5dff0..ae6661ae88f 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -906,6 +906,7 @@ src/core/ext/filters/client_channel/lb_policy.cc \ src/core/ext/filters/client_channel/lb_policy.h \ src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \ src/core/ext/filters/client_channel/lb_policy/address_filtering.h \ +src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h \ src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \