From 60c56f7d018ff3aaae77611897edc0dc3bbc013a Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 8 Apr 2022 11:12:18 -0700 Subject: [PATCH] Second attempt: implement ORCA RPC service for OOB backend metric reporting (#29352) * Revert "Revert "ORCA: implement ORCA RPC service for OOB backend metric reporting (#29215)" (#29351)" This reverts commit 71b355624f5a8edc6c9a01fd11b40f38a09c3659. * move ORCA service to its own BUILD rule --- BUILD | 30 +++ CMakeLists.txt | 61 +++++ build_autogenerated.yaml | 38 +++ include/grpcpp/ext/orca_service.h | 83 +++++++ .../xds/service/orca/v3/orca.upb.c | 47 ++++ .../xds/service/orca/v3/orca.upb.h | 109 +++++++++ src/cpp/server/orca/orca_service.cc | 224 ++++++++++++++++++ src/proto/grpc/testing/xds/v3/BUILD | 12 + .../grpc/testing/xds/v3/orca_service.proto | 47 ++++ test/cpp/end2end/BUILD | 15 ++ test/cpp/end2end/orca_service_end2end_test.cc | 200 ++++++++++++++++ tools/run_tests/generated/tests.json | 24 ++ 12 files changed, 890 insertions(+) create mode 100644 include/grpcpp/ext/orca_service.h create mode 100644 src/core/ext/upb-generated/xds/service/orca/v3/orca.upb.c create mode 100644 src/core/ext/upb-generated/xds/service/orca/v3/orca.upb.h create mode 100644 src/cpp/server/orca/orca_service.cc create mode 100644 src/proto/grpc/testing/xds/v3/orca_service.proto create mode 100644 test/cpp/end2end/orca_service_end2end_test.cc diff --git a/BUILD b/BUILD index 1f2df11a234..9644f9516e5 100644 --- a/BUILD +++ b/BUILD @@ -4972,6 +4972,31 @@ grpc_cc_library( alwayslink = 1, ) +grpc_cc_library( + name = "grpcpp_orca", + srcs = [ + "src/cpp/server/orca/orca_service.cc", + ], + external_deps = [ + "upb_lib", + ], + language = "c++", + public_hdrs = [ + "include/grpcpp/ext/orca_service.h", + ], + visibility = ["@grpc:public"], + deps = [ + "grpc++", + "grpc++_codegen_base", + "grpc_base", + "protobuf_duration_upb", + "time", + "xds_orca_service_upb", + "xds_orca_upb", + ], + alwayslink = 1, +) + grpc_cc_library( name = "grpcpp_channelz", srcs = [ @@ -5341,6 +5366,11 @@ grpc_upb_proto_library( deps = ["@com_github_cncf_udpa//xds/data/orca/v3:pkg"], ) +grpc_upb_proto_library( + name = "xds_orca_service_upb", + deps = ["@com_github_cncf_udpa//xds/service/orca/v3:pkg"], +) + grpc_upb_proto_library( name = "grpc_health_upb", deps = ["//src/proto/grpc/health/v1:health_proto_descriptor"], diff --git a/CMakeLists.txt b/CMakeLists.txt index 938e8d32fa9..4b5b2e25879 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -568,6 +568,9 @@ protobuf_generate_grpc_cpp( protobuf_generate_grpc_cpp( src/proto/grpc/testing/xds/v3/orca_load_report.proto ) +protobuf_generate_grpc_cpp( + src/proto/grpc/testing/xds/v3/orca_service.proto +) protobuf_generate_grpc_cpp( src/proto/grpc/testing/xds/v3/path.proto ) @@ -925,6 +928,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx mock_test) add_dependencies(buildtests_cxx nonblocking_test) add_dependencies(buildtests_cxx observable_test) + add_dependencies(buildtests_cxx orca_service_end2end_test) add_dependencies(buildtests_cxx orphanable_test) add_dependencies(buildtests_cxx out_of_bounds_bad_client_test) add_dependencies(buildtests_cxx overload_test) @@ -12996,6 +13000,63 @@ target_link_libraries(observable_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(orca_service_end2end_test + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_service.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_service.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_service.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_service.grpc.pb.h + src/core/ext/upb-generated/google/api/annotations.upb.c + src/core/ext/upb-generated/google/api/http.upb.c + src/core/ext/upb-generated/google/protobuf/any.upb.c + src/core/ext/upb-generated/google/protobuf/descriptor.upb.c + src/core/ext/upb-generated/google/protobuf/duration.upb.c + src/core/ext/upb-generated/google/protobuf/empty.upb.c + src/core/ext/upb-generated/google/protobuf/struct.upb.c + src/core/ext/upb-generated/google/protobuf/timestamp.upb.c + src/core/ext/upb-generated/google/protobuf/wrappers.upb.c + src/core/ext/upb-generated/google/rpc/status.upb.c + src/core/ext/upb-generated/validate/validate.upb.c + src/core/ext/upb-generated/xds/data/orca/v3/orca_load_report.upb.c + src/core/ext/upb-generated/xds/service/orca/v3/orca.upb.c + src/cpp/server/orca/orca_service.cc + test/cpp/end2end/orca_service_end2end_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(orca_service_end2end_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(orca_service_end2end_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc++_test_util +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index b0e434e23f5..0ffa6a98848 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -6647,6 +6647,44 @@ targets: - absl/types:variant - gpr uses_polling: false +- name: orca_service_end2end_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/upb-generated/google/api/annotations.upb.h + - src/core/ext/upb-generated/google/api/http.upb.h + - src/core/ext/upb-generated/google/protobuf/any.upb.h + - src/core/ext/upb-generated/google/protobuf/descriptor.upb.h + - src/core/ext/upb-generated/google/protobuf/duration.upb.h + - src/core/ext/upb-generated/google/protobuf/empty.upb.h + - src/core/ext/upb-generated/google/protobuf/struct.upb.h + - src/core/ext/upb-generated/google/protobuf/timestamp.upb.h + - src/core/ext/upb-generated/google/protobuf/wrappers.upb.h + - src/core/ext/upb-generated/google/rpc/status.upb.h + - src/core/ext/upb-generated/validate/validate.upb.h + - src/core/ext/upb-generated/xds/data/orca/v3/orca_load_report.upb.h + - src/core/ext/upb-generated/xds/service/orca/v3/orca.upb.h + src: + - src/proto/grpc/testing/xds/v3/orca_load_report.proto + - src/proto/grpc/testing/xds/v3/orca_service.proto + - src/core/ext/upb-generated/google/api/annotations.upb.c + - src/core/ext/upb-generated/google/api/http.upb.c + - src/core/ext/upb-generated/google/protobuf/any.upb.c + - src/core/ext/upb-generated/google/protobuf/descriptor.upb.c + - src/core/ext/upb-generated/google/protobuf/duration.upb.c + - src/core/ext/upb-generated/google/protobuf/empty.upb.c + - src/core/ext/upb-generated/google/protobuf/struct.upb.c + - src/core/ext/upb-generated/google/protobuf/timestamp.upb.c + - src/core/ext/upb-generated/google/protobuf/wrappers.upb.c + - src/core/ext/upb-generated/google/rpc/status.upb.c + - src/core/ext/upb-generated/validate/validate.upb.c + - src/core/ext/upb-generated/xds/data/orca/v3/orca_load_report.upb.c + - src/core/ext/upb-generated/xds/service/orca/v3/orca.upb.c + - src/cpp/server/orca/orca_service.cc + - test/cpp/end2end/orca_service_end2end_test.cc + deps: + - grpc++_test_util - name: orphanable_test gtest: true build: test diff --git a/include/grpcpp/ext/orca_service.h b/include/grpcpp/ext/orca_service.h new file mode 100644 index 00000000000..3acf4bf030f --- /dev/null +++ b/include/grpcpp/ext/orca_service.h @@ -0,0 +1,83 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef GRPCPP_EXT_ORCA_SERVICE_H +#define GRPCPP_EXT_ORCA_SERVICE_H + +#include +#include + +#include "absl/time/time.h" +#include "absl/types/optional.h" + +#include +#include +#include +#include +#include + +namespace grpc { +namespace experimental { + +// RPC service implementation for supplying out-of-band backend +// utilization metrics to clients. +class OrcaService : public Service { + public: + struct Options { + // Minimum report interval. If a client requests an interval lower + // than this value, this value will be used instead. + absl::Duration min_report_duration = absl::Seconds(30); + + Options() = default; + Options& set_min_report_duration(absl::Duration duration) { + min_report_duration = duration; + return *this; + } + }; + + explicit OrcaService(Options options); + + // Sets or removes the CPU utilization value to be reported to clients. + void SetCpuUtilization(double cpu_utilization); + void DeleteCpuUtilization(); + + // Sets of removes the memory utilization value to be reported to clients. + void SetMemoryUtilization(double memory_utilization); + void DeleteMemoryUtilization(); + + // Sets or removed named utilization values to be reported to clients. + void SetNamedUtilization(std::string name, double utilization); + void DeleteNamedUtilization(const std::string& name); + void SetAllNamedUtilization(std::map named_utilization); + + private: + class Reactor; + + Slice GetOrCreateSerializedResponse(); + + const absl::Duration min_report_duration_; + + grpc::internal::Mutex mu_; + double cpu_utilization_ ABSL_GUARDED_BY(&mu_) = -1; + double memory_utilization_ ABSL_GUARDED_BY(&mu_) = -1; + std::map named_utilization_ ABSL_GUARDED_BY(&mu_); + absl::optional response_slice_ ABSL_GUARDED_BY(&mu_); +}; + +} // namespace experimental +} // namespace grpc + +#endif // GRPCPP_EXT_ORCA_SERVICE_H diff --git a/src/core/ext/upb-generated/xds/service/orca/v3/orca.upb.c b/src/core/ext/upb-generated/xds/service/orca/v3/orca.upb.c new file mode 100644 index 00000000000..cd2350ccfe9 --- /dev/null +++ b/src/core/ext/upb-generated/xds/service/orca/v3/orca.upb.c @@ -0,0 +1,47 @@ +/* This file was generated by upbc (the upb compiler) from the input + * file: + * + * xds/service/orca/v3/orca.proto + * + * Do not edit -- your changes will be discarded when the file is + * regenerated. */ + +#include +#include "upb/msg_internal.h" +#include "xds/service/orca/v3/orca.upb.h" +#include "xds/data/orca/v3/orca_load_report.upb.h" +#include "google/protobuf/duration.upb.h" +#include "validate/validate.upb.h" + +#include "upb/port_def.inc" + +static const upb_MiniTable_Sub xds_service_orca_v3_OrcaLoadReportRequest_submsgs[1] = { + {.submsg = &google_protobuf_Duration_msginit}, +}; + +static const upb_MiniTable_Field xds_service_orca_v3_OrcaLoadReportRequest__fields[2] = { + {1, UPB_SIZE(4, 8), UPB_SIZE(1, 1), 0, 11, kUpb_FieldMode_Scalar | (kUpb_FieldRep_Pointer << kUpb_FieldRep_Shift)}, + {2, UPB_SIZE(8, 16), UPB_SIZE(0, 0), kUpb_NoSub, 9, kUpb_FieldMode_Array | (kUpb_FieldRep_Pointer << kUpb_FieldRep_Shift)}, +}; + +const upb_MiniTable xds_service_orca_v3_OrcaLoadReportRequest_msginit = { + &xds_service_orca_v3_OrcaLoadReportRequest_submsgs[0], + &xds_service_orca_v3_OrcaLoadReportRequest__fields[0], + UPB_SIZE(12, 24), 2, kUpb_ExtMode_NonExtendable, 2, 255, 0, +}; + +static const upb_MiniTable *messages_layout[1] = { + &xds_service_orca_v3_OrcaLoadReportRequest_msginit, +}; + +const upb_MiniTable_File xds_service_orca_v3_orca_proto_upb_file_layout = { + messages_layout, + NULL, + NULL, + 1, + 0, + 0, +}; + +#include "upb/port_undef.inc" + diff --git a/src/core/ext/upb-generated/xds/service/orca/v3/orca.upb.h b/src/core/ext/upb-generated/xds/service/orca/v3/orca.upb.h new file mode 100644 index 00000000000..53f348f248e --- /dev/null +++ b/src/core/ext/upb-generated/xds/service/orca/v3/orca.upb.h @@ -0,0 +1,109 @@ +/* This file was generated by upbc (the upb compiler) from the input + * file: + * + * xds/service/orca/v3/orca.proto + * + * Do not edit -- your changes will be discarded when the file is + * regenerated. */ + +#ifndef XDS_SERVICE_ORCA_V3_ORCA_PROTO_UPB_H_ +#define XDS_SERVICE_ORCA_V3_ORCA_PROTO_UPB_H_ + +#include "upb/msg_internal.h" +#include "upb/decode.h" +#include "upb/decode_fast.h" +#include "upb/encode.h" + +#include "upb/port_def.inc" + +#ifdef __cplusplus +extern "C" { +#endif + +struct xds_service_orca_v3_OrcaLoadReportRequest; +typedef struct xds_service_orca_v3_OrcaLoadReportRequest xds_service_orca_v3_OrcaLoadReportRequest; +extern const upb_MiniTable xds_service_orca_v3_OrcaLoadReportRequest_msginit; +struct google_protobuf_Duration; +extern const upb_MiniTable google_protobuf_Duration_msginit; + + + +/* xds.service.orca.v3.OrcaLoadReportRequest */ + +UPB_INLINE xds_service_orca_v3_OrcaLoadReportRequest* xds_service_orca_v3_OrcaLoadReportRequest_new(upb_Arena* arena) { + return (xds_service_orca_v3_OrcaLoadReportRequest*)_upb_Message_New(&xds_service_orca_v3_OrcaLoadReportRequest_msginit, arena); +} +UPB_INLINE xds_service_orca_v3_OrcaLoadReportRequest* xds_service_orca_v3_OrcaLoadReportRequest_parse(const char* buf, size_t size, upb_Arena* arena) { + xds_service_orca_v3_OrcaLoadReportRequest* ret = xds_service_orca_v3_OrcaLoadReportRequest_new(arena); + if (!ret) return NULL; + if (upb_Decode(buf, size, ret, &xds_service_orca_v3_OrcaLoadReportRequest_msginit, NULL, 0, arena) != kUpb_DecodeStatus_Ok) { + return NULL; + } + return ret; +} +UPB_INLINE xds_service_orca_v3_OrcaLoadReportRequest* xds_service_orca_v3_OrcaLoadReportRequest_parse_ex(const char* buf, size_t size, + const upb_ExtensionRegistry* extreg, + int options, upb_Arena* arena) { + xds_service_orca_v3_OrcaLoadReportRequest* ret = xds_service_orca_v3_OrcaLoadReportRequest_new(arena); + if (!ret) return NULL; + if (upb_Decode(buf, size, ret, &xds_service_orca_v3_OrcaLoadReportRequest_msginit, extreg, options, arena) != + kUpb_DecodeStatus_Ok) { + return NULL; + } + return ret; +} +UPB_INLINE char* xds_service_orca_v3_OrcaLoadReportRequest_serialize(const xds_service_orca_v3_OrcaLoadReportRequest* msg, upb_Arena* arena, size_t* len) { + return upb_Encode(msg, &xds_service_orca_v3_OrcaLoadReportRequest_msginit, 0, arena, len); +} +UPB_INLINE char* xds_service_orca_v3_OrcaLoadReportRequest_serialize_ex(const xds_service_orca_v3_OrcaLoadReportRequest* msg, int options, + upb_Arena* arena, size_t* len) { + return upb_Encode(msg, &xds_service_orca_v3_OrcaLoadReportRequest_msginit, options, arena, len); +} +UPB_INLINE bool xds_service_orca_v3_OrcaLoadReportRequest_has_report_interval(const xds_service_orca_v3_OrcaLoadReportRequest* msg) { + return _upb_hasbit(msg, 1); +} +UPB_INLINE void xds_service_orca_v3_OrcaLoadReportRequest_clear_report_interval(const xds_service_orca_v3_OrcaLoadReportRequest* msg) { + *UPB_PTR_AT(msg, UPB_SIZE(4, 8), const upb_Message*) = NULL; +} +UPB_INLINE const struct google_protobuf_Duration* xds_service_orca_v3_OrcaLoadReportRequest_report_interval(const xds_service_orca_v3_OrcaLoadReportRequest* msg) { + return *UPB_PTR_AT(msg, UPB_SIZE(4, 8), const struct google_protobuf_Duration*); +} +UPB_INLINE void xds_service_orca_v3_OrcaLoadReportRequest_clear_request_cost_names(const xds_service_orca_v3_OrcaLoadReportRequest* msg) { + _upb_array_detach(msg, UPB_SIZE(8, 16)); +} +UPB_INLINE upb_StringView const* xds_service_orca_v3_OrcaLoadReportRequest_request_cost_names(const xds_service_orca_v3_OrcaLoadReportRequest* msg, size_t* len) { + return (upb_StringView const*)_upb_array_accessor(msg, UPB_SIZE(8, 16), len); +} + +UPB_INLINE void xds_service_orca_v3_OrcaLoadReportRequest_set_report_interval(xds_service_orca_v3_OrcaLoadReportRequest *msg, struct google_protobuf_Duration* value) { + _upb_sethas(msg, 1); + *UPB_PTR_AT(msg, UPB_SIZE(4, 8), struct google_protobuf_Duration*) = value; +} +UPB_INLINE struct google_protobuf_Duration* xds_service_orca_v3_OrcaLoadReportRequest_mutable_report_interval(xds_service_orca_v3_OrcaLoadReportRequest* msg, upb_Arena* arena) { + struct google_protobuf_Duration* sub = (struct google_protobuf_Duration*)xds_service_orca_v3_OrcaLoadReportRequest_report_interval(msg); + if (sub == NULL) { + sub = (struct google_protobuf_Duration*)_upb_Message_New(&google_protobuf_Duration_msginit, arena); + if (!sub) return NULL; + xds_service_orca_v3_OrcaLoadReportRequest_set_report_interval(msg, sub); + } + return sub; +} +UPB_INLINE upb_StringView* xds_service_orca_v3_OrcaLoadReportRequest_mutable_request_cost_names(xds_service_orca_v3_OrcaLoadReportRequest* msg, size_t* len) { + return (upb_StringView*)_upb_array_mutable_accessor(msg, UPB_SIZE(8, 16), len); +} +UPB_INLINE upb_StringView* xds_service_orca_v3_OrcaLoadReportRequest_resize_request_cost_names(xds_service_orca_v3_OrcaLoadReportRequest* msg, size_t len, upb_Arena* arena) { + return (upb_StringView*)_upb_Array_Resize_accessor2(msg, UPB_SIZE(8, 16), len, UPB_SIZE(3, 4), arena); +} +UPB_INLINE bool xds_service_orca_v3_OrcaLoadReportRequest_add_request_cost_names(xds_service_orca_v3_OrcaLoadReportRequest* msg, upb_StringView val, upb_Arena* arena) { + return _upb_Array_Append_accessor2(msg, UPB_SIZE(8, 16), UPB_SIZE(3, 4), &val, arena); +} + +extern const upb_MiniTable_File xds_service_orca_v3_orca_proto_upb_file_layout; + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#include "upb/port_undef.inc" + +#endif /* XDS_SERVICE_ORCA_V3_ORCA_PROTO_UPB_H_ */ diff --git a/src/cpp/server/orca/orca_service.cc b/src/cpp/server/orca/orca_service.cc new file mode 100644 index 00000000000..2d84a7c4ab2 --- /dev/null +++ b/src/cpp/server/orca/orca_service.cc @@ -0,0 +1,224 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include "google/protobuf/duration.upb.h" +#include "upb/upb.hpp" +#include "xds/data/orca/v3/orca_load_report.upb.h" +#include "xds/service/orca/v3/orca.upb.h" + +#include +#include + +#include "src/core/lib/gprpp/time.h" +#include "src/core/lib/iomgr/timer.h" + +namespace grpc { +namespace experimental { + +// +// OrcaService::Reactor +// + +class OrcaService::Reactor : public ServerWriteReactor, + public grpc_core::RefCounted { + public: + explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer) + : service_(service) { + GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr); + // Get slice from request. + Slice slice; + request_buffer->DumpToSingleSlice(&slice); + // Parse request proto. + upb::Arena arena; + xds_service_orca_v3_OrcaLoadReportRequest* request = + xds_service_orca_v3_OrcaLoadReportRequest_parse( + reinterpret_cast(slice.begin()), slice.size(), + arena.ptr()); + if (request == nullptr) { + Finish(Status(StatusCode::INTERNAL, "could not parse request proto")); + return; + } + const auto* duration_proto = + xds_service_orca_v3_OrcaLoadReportRequest_report_interval(request); + if (duration_proto != nullptr) { + report_interval_ = grpc_core::Duration::FromSecondsAndNanoseconds( + google_protobuf_Duration_seconds(duration_proto), + google_protobuf_Duration_nanos(duration_proto)); + } + auto min_interval = grpc_core::Duration::Milliseconds( + service_->min_report_duration_ / absl::Milliseconds(1)); + if (report_interval_ < min_interval) report_interval_ = min_interval; + // Send initial response. + SendResponse(); + } + + void OnWriteDone(bool ok) override { + if (!ok) { + Finish(Status(StatusCode::UNKNOWN, "write failed")); + return; + } + response_.Clear(); + ScheduleTimer(); + } + + void OnCancel() override { + MaybeCancelTimer(); + Finish(Status(StatusCode::UNKNOWN, "call cancelled by client")); + } + + void OnDone() override { + // Free the initial ref from instantiation. + Unref(); + } + + private: + void SendResponse() { + Slice response_slice = service_->GetOrCreateSerializedResponse(); + ByteBuffer response_buffer(&response_slice, 1); + response_.Swap(&response_buffer); + StartWrite(&response_); + } + + void ScheduleTimer() { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + Ref().release(); // Ref held by timer. + grpc::internal::MutexLock lock(&timer_mu_); + timer_pending_ = true; + grpc_timer_init(&timer_, exec_ctx.Now() + report_interval_, &on_timer_); + } + + void MaybeCancelTimer() { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + grpc::internal::MutexLock lock(&timer_mu_); + if (timer_pending_) { + timer_pending_ = false; + grpc_timer_cancel(&timer_); + } + } + + static void OnTimer(void* arg, grpc_error_handle error) { + grpc_core::RefCountedPtr self(static_cast(arg)); + grpc::internal::MutexLock lock(&self->timer_mu_); + if (error == GRPC_ERROR_NONE && self->timer_pending_) { + self->timer_pending_ = false; + self->SendResponse(); + } + } + + OrcaService* service_; + + // TODO(roth): Change this to use the EventEngine API once it becomes + // available. + grpc::internal::Mutex timer_mu_; + bool timer_pending_ ABSL_GUARDED_BY(&timer_mu_) = false; + grpc_timer timer_ ABSL_GUARDED_BY(&timer_mu_); + grpc_closure on_timer_; + + grpc_core::Duration report_interval_; + ByteBuffer response_; +}; + +// +// OrcaService +// + +OrcaService::OrcaService(OrcaService::Options options) + : min_report_duration_(options.min_report_duration) { + AddMethod(new internal::RpcServiceMethod( + "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics", + internal::RpcMethod::SERVER_STREAMING, /*handler=*/nullptr)); + MarkMethodCallback( + 0, new internal::CallbackServerStreamingHandler( + [this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) { + return new Reactor(this, request); + })); +} + +void OrcaService::SetCpuUtilization(double cpu_utilization) { + grpc::internal::MutexLock lock(&mu_); + cpu_utilization_ = cpu_utilization; + response_slice_.reset(); +} + +void OrcaService::DeleteCpuUtilization() { + grpc::internal::MutexLock lock(&mu_); + cpu_utilization_ = -1; + response_slice_.reset(); +} + +void OrcaService::SetMemoryUtilization(double memory_utilization) { + grpc::internal::MutexLock lock(&mu_); + memory_utilization_ = memory_utilization; + response_slice_.reset(); +} + +void OrcaService::DeleteMemoryUtilization() { + grpc::internal::MutexLock lock(&mu_); + memory_utilization_ = -1; + response_slice_.reset(); +} + +void OrcaService::SetNamedUtilization(std::string name, double utilization) { + grpc::internal::MutexLock lock(&mu_); + named_utilization_[std::move(name)] = utilization; + response_slice_.reset(); +} + +void OrcaService::DeleteNamedUtilization(const std::string& name) { + grpc::internal::MutexLock lock(&mu_); + named_utilization_.erase(name); + response_slice_.reset(); +} + +void OrcaService::SetAllNamedUtilization( + std::map named_utilization) { + grpc::internal::MutexLock lock(&mu_); + named_utilization_ = std::move(named_utilization); + response_slice_.reset(); +} + +Slice OrcaService::GetOrCreateSerializedResponse() { + grpc::internal::MutexLock lock(&mu_); + if (!response_slice_.has_value()) { + upb::Arena arena; + xds_data_orca_v3_OrcaLoadReport* response = + xds_data_orca_v3_OrcaLoadReport_new(arena.ptr()); + if (cpu_utilization_ != -1) { + xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization(response, + cpu_utilization_); + } + if (memory_utilization_ != -1) { + xds_data_orca_v3_OrcaLoadReport_set_mem_utilization(response, + memory_utilization_); + } + for (const auto& p : named_utilization_) { + xds_data_orca_v3_OrcaLoadReport_utilization_set( + response, + upb_StringView_FromDataAndSize(p.first.data(), p.first.size()), + p.second, arena.ptr()); + } + size_t buf_length; + char* buf = xds_data_orca_v3_OrcaLoadReport_serialize(response, arena.ptr(), + &buf_length); + response_slice_.emplace(buf, buf_length); + } + return Slice(*response_slice_); +} + +} // namespace experimental +} // namespace grpc diff --git a/src/proto/grpc/testing/xds/v3/BUILD b/src/proto/grpc/testing/xds/v3/BUILD index fe2ee270bf9..87f6f997159 100644 --- a/src/proto/grpc/testing/xds/v3/BUILD +++ b/src/proto/grpc/testing/xds/v3/BUILD @@ -167,6 +167,18 @@ grpc_proto_library( ], ) +grpc_proto_library( + name = "orca_service_proto", + srcs = [ + "orca_service.proto", + ], + has_services = True, + well_known_protos = True, + deps = [ + "orca_load_report_proto", + ], +) + grpc_proto_library( name = "protocol_proto", srcs = [ diff --git a/src/proto/grpc/testing/xds/v3/orca_service.proto b/src/proto/grpc/testing/xds/v3/orca_service.proto new file mode 100644 index 00000000000..4ba5a46edc7 --- /dev/null +++ b/src/proto/grpc/testing/xds/v3/orca_service.proto @@ -0,0 +1,47 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Local copy of Envoy xDS proto file, used for testing only. + +syntax = "proto3"; + +package xds.service.orca.v3; + +import "src/proto/grpc/testing/xds/v3/orca_load_report.proto"; + +import "google/protobuf/duration.proto"; + +// See section `Out-of-band (OOB) reporting` of the design document in +// :ref:`https://github.com/envoyproxy/envoy/issues/6614`. + +// Out-of-band (OOB) load reporting service for the additional load reporting +// agent that does not sit in the request path. Reports are periodically sampled +// with sufficient frequency to provide temporal association with requests. +// OOB reporting compensates the limitation of in-band reporting in revealing +// costs for backends that do not provide a steady stream of telemetry such as +// long running stream operations and zero QPS services. This is a server +// streaming service, client needs to terminate current RPC and initiate +// a new call to change backend reporting frequency. +service OpenRcaService { + rpc StreamCoreMetrics(OrcaLoadReportRequest) returns (stream xds.data.orca.v3.OrcaLoadReport); +} + +message OrcaLoadReportRequest { + // Interval for generating Open RCA core metric responses. + google.protobuf.Duration report_interval = 1; + // Request costs to collect. If this is empty, all known requests costs tracked by + // the load reporting agent will be returned. This provides an opportunity for + // the client to selectively obtain a subset of tracked costs. + repeated string request_cost_names = 2; +} diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 22e71f5b9c5..68cca0e6114 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -928,3 +928,18 @@ grpc_cc_test( "//test/cpp/util:test_util", ], ) + +grpc_cc_test( + name = "orca_service_end2end_test", + srcs = ["orca_service_end2end_test.cc"], + external_deps = [ + "gtest", + ], + deps = [ + "//:grpc++", + "//:grpcpp_orca", + "//src/proto/grpc/testing/xds/v3:orca_service_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) diff --git a/test/cpp/end2end/orca_service_end2end_test.cc b/test/cpp/end2end/orca_service_end2end_test.cc new file mode 100644 index 00000000000..fd727f48a69 --- /dev/null +++ b/test/cpp/end2end/orca_service_end2end_test.cc @@ -0,0 +1,200 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include +#include + +#include "absl/strings/str_cat.h" +#include "absl/types/optional.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/gprpp/time.h" +#include "src/proto/grpc/testing/xds/v3/orca_service.grpc.pb.h" +#include "src/proto/grpc/testing/xds/v3/orca_service.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +using xds::data::orca::v3::OrcaLoadReport; +using xds::service::orca::v3::OpenRcaService; +using xds::service::orca::v3::OrcaLoadReportRequest; + +namespace grpc { +namespace testing { +namespace { + +using experimental::OrcaService; + +class OrcaServiceEnd2endTest : public ::testing::Test { + protected: + // A wrapper for the client stream that ensures that responses come + // back at the requested interval. + class Stream { + public: + Stream(OpenRcaService::Stub* stub, grpc_core::Duration requested_interval) + : requested_interval_(requested_interval) { + OrcaLoadReportRequest request; + gpr_timespec timespec = requested_interval.as_timespec(); + auto* interval_proto = request.mutable_report_interval(); + interval_proto->set_seconds(timespec.tv_sec); + interval_proto->set_nanos(timespec.tv_nsec); + stream_ = stub->StreamCoreMetrics(&context_, request); + } + + ~Stream() { context_.TryCancel(); } + + OrcaLoadReport ReadResponse() { + OrcaLoadReport response; + EXPECT_TRUE(stream_->Read(&response)); + auto now = grpc_core::Timestamp::FromTimespecRoundDown( + gpr_now(GPR_CLOCK_MONOTONIC)); + if (last_response_time_.has_value()) { + // Allow a small fudge factor to avoid test flakiness. + const grpc_core::Duration fudge_factor = + grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(); + auto elapsed = now - *last_response_time_; + EXPECT_GE(elapsed, requested_interval_ - fudge_factor) + << elapsed.ToString(); + EXPECT_LE(elapsed, requested_interval_ + fudge_factor) + << elapsed.ToString(); + } + last_response_time_ = now; + return response; + } + + private: + const grpc_core::Duration requested_interval_; + ClientContext context_; + std::unique_ptr> stream_; + absl::optional last_response_time_; + }; + + OrcaServiceEnd2endTest() + : orca_service_(OrcaService::Options().set_min_report_duration( + absl::ZeroDuration())) { + std::string server_address = + absl::StrCat("localhost:", grpc_pick_unused_port_or_die()); + ServerBuilder builder; + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&orca_service_); + server_ = builder.BuildAndStart(); + gpr_log(GPR_INFO, "server started on %s", server_address_.c_str()); + auto channel = CreateChannel(server_address, InsecureChannelCredentials()); + stub_ = OpenRcaService::NewStub(channel); + } + + ~OrcaServiceEnd2endTest() override { server_->Shutdown(); } + + std::string server_address_; + OrcaService orca_service_; + std::unique_ptr server_; + std::unique_ptr stub_; +}; + +TEST_F(OrcaServiceEnd2endTest, Basic) { + constexpr char kMetricName1[] = "foo"; + constexpr char kMetricName2[] = "bar"; + constexpr char kMetricName3[] = "baz"; + constexpr char kMetricName4[] = "quux"; + // Start stream1 with 5s interval and stream2 with 2.5s interval. + // Throughout the test, we should get two responses on stream2 for + // every one response on stream1. + Stream stream1(stub_.get(), grpc_core::Duration::Milliseconds(5000)); + Stream stream2(stub_.get(), grpc_core::Duration::Milliseconds(2500)); + auto ReadResponses = [&](std::function checker) { + gpr_log(GPR_INFO, "reading response from stream1"); + OrcaLoadReport response = stream1.ReadResponse(); + checker(response); + gpr_log(GPR_INFO, "reading response from stream2"); + response = stream2.ReadResponse(); + checker(response); + gpr_log(GPR_INFO, "reading response from stream2"); + response = stream2.ReadResponse(); + checker(response); + }; + // Initial response should not have any values populated. + ReadResponses([](const OrcaLoadReport& response) { + EXPECT_EQ(response.cpu_utilization(), 0); + EXPECT_EQ(response.mem_utilization(), 0); + EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre()); + }); + // Now set CPU utilization on the server. + orca_service_.SetCpuUtilization(0.5); + ReadResponses([](const OrcaLoadReport& response) { + EXPECT_EQ(response.cpu_utilization(), 0.5); + EXPECT_EQ(response.mem_utilization(), 0); + EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre()); + }); + // Update CPU utilization and set memory utilization. + orca_service_.SetCpuUtilization(0.8); + orca_service_.SetMemoryUtilization(0.4); + ReadResponses([](const OrcaLoadReport& response) { + EXPECT_EQ(response.cpu_utilization(), 0.8); + EXPECT_EQ(response.mem_utilization(), 0.4); + EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre()); + }); + // Unset CPU and memory utilization and set a named utilization. + orca_service_.DeleteCpuUtilization(); + orca_service_.DeleteMemoryUtilization(); + orca_service_.SetNamedUtilization(kMetricName1, 0.3); + ReadResponses([&](const OrcaLoadReport& response) { + EXPECT_EQ(response.cpu_utilization(), 0); + EXPECT_EQ(response.mem_utilization(), 0); + EXPECT_THAT( + response.utilization(), + ::testing::UnorderedElementsAre(::testing::Pair(kMetricName1, 0.3))); + }); + // Unset the previous named utilization and set two new ones. + orca_service_.DeleteNamedUtilization(kMetricName1); + orca_service_.SetNamedUtilization(kMetricName2, 0.2); + orca_service_.SetNamedUtilization(kMetricName3, 0.1); + ReadResponses([&](const OrcaLoadReport& response) { + EXPECT_EQ(response.cpu_utilization(), 0); + EXPECT_EQ(response.mem_utilization(), 0); + EXPECT_THAT( + response.utilization(), + ::testing::UnorderedElementsAre(::testing::Pair(kMetricName2, 0.2), + ::testing::Pair(kMetricName3, 0.1))); + }); + // Replace the entire named metric map at once. + orca_service_.SetAllNamedUtilization( + {{kMetricName2, 0.5}, {kMetricName4, 0.9}}); + ReadResponses([&](const OrcaLoadReport& response) { + EXPECT_EQ(response.cpu_utilization(), 0); + EXPECT_EQ(response.mem_utilization(), 0); + EXPECT_THAT( + response.utilization(), + ::testing::UnorderedElementsAre(::testing::Pair(kMetricName2, 0.5), + ::testing::Pair(kMetricName4, 0.9))); + }); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(&argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 5e23594c239..434063a37ee 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -5499,6 +5499,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "orca_service_end2end_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,