diff --git a/CMakeLists.txt b/CMakeLists.txt index ab169d9afd3..1dbc6db5e0e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -880,6 +880,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx avl_test) add_dependencies(buildtests_cxx aws_request_signer_test) add_dependencies(buildtests_cxx b64_test) + add_dependencies(buildtests_cxx backend_metrics_lb_policy_test) add_dependencies(buildtests_cxx backoff_test) add_dependencies(buildtests_cxx bad_ping_test) add_dependencies(buildtests_cxx bad_server_response_test) @@ -6853,6 +6854,55 @@ target_link_libraries(b64_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(backend_metrics_lb_policy_test + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.h + src/cpp/server/orca/orca_service.cc + test/cpp/interop/backend_metrics_lb_policy.cc + test/cpp/interop/backend_metrics_lb_policy_test.cc +) +target_compile_features(backend_metrics_lb_policy_test PUBLIC cxx_std_14) +target_include_directories(backend_metrics_lb_policy_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(backend_metrics_lb_policy_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + grpc++ + grpc_test_util + grpc++_test_config +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 92437f0ce8a..9f9385d801d 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5849,6 +5849,24 @@ targets: - gtest - grpc_test_util uses_polling: false +- name: backend_metrics_lb_policy_test + gtest: true + build: test + language: c++ + headers: + - test/cpp/interop/backend_metrics_lb_policy.h + src: + - src/proto/grpc/testing/empty.proto + - src/proto/grpc/testing/messages.proto + - src/proto/grpc/testing/test.proto + - src/cpp/server/orca/orca_service.cc + - test/cpp/interop/backend_metrics_lb_policy.cc + - test/cpp/interop/backend_metrics_lb_policy_test.cc + deps: + - gtest + - grpc++ + - grpc_test_util + - grpc++_test_config - name: backoff_test gtest: true build: test diff --git a/test/cpp/interop/BUILD b/test/cpp/interop/BUILD index 8262b29ab27..6d5b80422c9 100644 --- a/test/cpp/interop/BUILD +++ b/test/cpp/interop/BUILD @@ -532,6 +532,22 @@ grpc_cc_library( ], ) +grpc_cc_test( + name = "backend_metrics_lb_policy_test", + srcs = [ + "backend_metrics_lb_policy_test.cc", + ], + external_deps = ["gtest"], + deps = [ + ":backend_metrics_lb_policy", + "//:grpc++", + "//:grpcpp_orca_service", + "//src/proto/grpc/testing:test_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_config", + ], +) + grpc_cc_library( name = "rpc_behavior_lb_policy", srcs = [ diff --git a/test/cpp/interop/backend_metrics_lb_policy.cc b/test/cpp/interop/backend_metrics_lb_policy.cc index a10bd4f63a5..30f3bb18943 100644 --- a/test/cpp/interop/backend_metrics_lb_policy.cc +++ b/test/cpp/interop/backend_metrics_lb_policy.cc @@ -86,6 +86,11 @@ class BackendMetricsLbPolicy : public LoadBalancingPolicy { } absl::Status UpdateLocked(UpdateArgs args) override { + auto config = + CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( + grpc_core::Json::FromArray({grpc_core::Json::FromObject( + {{"pick_first", grpc_core::Json::FromObject({})}})})); + args.config = std::move(config.value()); return delegate_->UpdateLocked(std::move(args)); } @@ -249,14 +254,12 @@ LoadReportTracker::LoadReportEntry LoadReportTracker::WaitForOobLoadReport( grpc_core::MutexLock lock(&load_reports_mu_); // This condition will be called under lock for (size_t i = 0; i < max_attempts; i++) { - auto deadline = absl::Now() + poll_timeout; - // loop to handle spurious wakeups. - do { - if (absl::Now() >= deadline) { + if (oob_load_reports_.empty()) { + load_reports_cv_.WaitWithTimeout(&load_reports_mu_, poll_timeout); + if (oob_load_reports_.empty()) { return absl::nullopt; } - load_reports_cv_.WaitWithDeadline(&load_reports_mu_, deadline); - } while (oob_load_reports_.empty()); + } auto report = std::move(oob_load_reports_.front()); oob_load_reports_.pop_front(); if (predicate(report)) { diff --git a/test/cpp/interop/backend_metrics_lb_policy_test.cc b/test/cpp/interop/backend_metrics_lb_policy_test.cc new file mode 100644 index 00000000000..8ceae112fa8 --- /dev/null +++ b/test/cpp/interop/backend_metrics_lb_policy_test.cc @@ -0,0 +1,159 @@ +// +// +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include "test/cpp/interop/backend_metrics_lb_policy.h" + +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include "src/core/lib/config/config_vars.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/proto/grpc/testing/messages.pb.h" +#include "src/proto/grpc/testing/test.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +namespace grpc { +namespace testing { +namespace { + +class EchoServiceImpl : public grpc::testing::TestService::CallbackService { + public: + grpc::ServerUnaryReactor* UnaryCall( + grpc::CallbackServerContext* context, + const grpc::testing::SimpleRequest* /* request */, + grpc::testing::SimpleResponse* /* response */) override { + auto reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; + } +}; + +class Server { + public: + Server() : port_(grpc_pick_unused_port_or_die()) { + server_thread_ = std::thread(ServerLoop, this); + grpc_core::MutexLock lock(&mu_); + cond_.WaitWithTimeout(&mu_, absl::Seconds(1)); + } + + ~Server() { + server_->Shutdown(); + server_thread_.join(); + } + + std::string address() const { return absl::StrCat("localhost:", port_); } + + private: + static void ServerLoop(Server* server) { server->Run(); } + + void Run() { + ServerBuilder builder; + EchoServiceImpl service; + auto server_metric_recorder = + grpc::experimental::ServerMetricRecorder::Create(); + server_metric_recorder->SetCpuUtilization(.5f); + grpc::experimental::OrcaService orca_service( + server_metric_recorder.get(), + grpc::experimental::OrcaService::Options().set_min_report_duration( + absl::Seconds(1))); + builder.RegisterService(&orca_service); + builder.RegisterService(&service); + builder.AddListeningPort(address(), InsecureServerCredentials()); + auto grpc_server = builder.BuildAndStart(); + server_ = grpc_server.get(); + { + grpc_core::MutexLock lock(&mu_); + cond_.SignalAll(); + } + grpc_server->Wait(); + } + + int port_; + grpc_core::Mutex mu_; + grpc_core::CondVar cond_; + std::thread server_thread_; + grpc::Server* server_; +}; + +TEST(BackendMetricsLbPolicyTest, TestOobMetricsReceipt) { + LoadReportTracker tracker; + grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy); + Server server; + ChannelArguments args = tracker.GetChannelArguments(); + args.SetLoadBalancingPolicyName("test_backend_metrics_load_balancer"); + auto channel = grpc::CreateCustomChannel(server.address(), + InsecureChannelCredentials(), args); + auto stub = grpc::testing::TestService::Stub(channel); + ClientContext ctx; + SimpleRequest req; + SimpleResponse res; + grpc_core::Mutex mu; + grpc_core::CondVar cond; + absl::optional status; + + stub.async()->UnaryCall(&ctx, &req, &res, [&](auto s) { + grpc_core::MutexLock lock(&mu); + status = s; + cond.SignalAll(); + }); + // This report is sent on start, available immediately + auto report = tracker.WaitForOobLoadReport( + [](auto report) { return report.cpu_utilization() == 0.5; }, + absl::Milliseconds(1500), 3); + ASSERT_TRUE(report.has_value()); + EXPECT_EQ(report->cpu_utilization(), 0.5); + for (size_t i = 0; i < 3; i++) { + // Wait for slightly more than 1 min + report = tracker.WaitForOobLoadReport( + [](auto report) { return report.cpu_utilization() == 0.5; }, + absl::Milliseconds(1500), 3); + ASSERT_TRUE(report.has_value()); + EXPECT_EQ(report->cpu_utilization(), 0.5); + } + { + grpc_core::MutexLock lock(&mu); + if (!status.has_value()) { + cond.Wait(&mu); + } + ASSERT_TRUE(status.has_value()); + EXPECT_EQ(status->error_code(), grpc::StatusCode::OK); + } +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 04bc8022d27..0185957408e 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -40,6 +40,7 @@ #include #include +#include "src/core/lib/config/config_vars.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/gprpp/crash.h" #include "src/proto/grpc/testing/empty.pb.h" @@ -1022,8 +1023,13 @@ bool InteropClient::DoOrcaPerRpc() { bool InteropClient::DoOrcaOob() { static constexpr auto kTimeout = absl::Seconds(10); - gpr_log(GPR_DEBUG, "testing orca oob"); + gpr_log(GPR_INFO, "testing orca oob"); load_report_tracker_.ResetCollectedLoadReports(); + // Make the backup poller poll very frequently in order to pick up + // updates from all the subchannels's FDs. + grpc_core::ConfigVars::Overrides overrides; + overrides.client_channel_backup_poll_interval_ms = 250; + grpc_core::ConfigVars::SetOverrides(overrides); grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy); ClientContext context; std::unique_ptr