[Test] Fix bug in waiting for the Orca OOB report (#35467)

Make sure there is no unnecessary delays when there are multiple reports in the queue.

This change also adds a test for the custom LB policy.

Closes #35467

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35467 from eugeneo:tasks/orca-test-timeout-316026521 4aab50a118
PiperOrigin-RevId: 597007131
pull/35491/head^2
Eugene Ostroukhov 1 year ago committed by Copybara-Service
parent 6eaec4f96e
commit e73b76a7da
  1. 50
      CMakeLists.txt
  2. 18
      build_autogenerated.yaml
  3. 16
      test/cpp/interop/BUILD
  4. 15
      test/cpp/interop/backend_metrics_lb_policy.cc
  5. 159
      test/cpp/interop/backend_metrics_lb_policy_test.cc
  6. 10
      test/cpp/interop/interop_client.cc
  7. 24
      tools/run_tests/generated/tests.json

50
CMakeLists.txt generated

@ -880,6 +880,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx avl_test) add_dependencies(buildtests_cxx avl_test)
add_dependencies(buildtests_cxx aws_request_signer_test) add_dependencies(buildtests_cxx aws_request_signer_test)
add_dependencies(buildtests_cxx b64_test) add_dependencies(buildtests_cxx b64_test)
add_dependencies(buildtests_cxx backend_metrics_lb_policy_test)
add_dependencies(buildtests_cxx backoff_test) add_dependencies(buildtests_cxx backoff_test)
add_dependencies(buildtests_cxx bad_ping_test) add_dependencies(buildtests_cxx bad_ping_test)
add_dependencies(buildtests_cxx bad_server_response_test) add_dependencies(buildtests_cxx bad_server_response_test)
@ -6853,6 +6854,55 @@ target_link_libraries(b64_test
) )
endif()
if(gRPC_BUILD_TESTS)
add_executable(backend_metrics_lb_policy_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.h
src/cpp/server/orca/orca_service.cc
test/cpp/interop/backend_metrics_lb_policy.cc
test/cpp/interop/backend_metrics_lb_policy_test.cc
)
target_compile_features(backend_metrics_lb_policy_test PUBLIC cxx_std_14)
target_include_directories(backend_metrics_lb_policy_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(backend_metrics_lb_policy_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc++
grpc_test_util
grpc++_test_config
)
endif() endif()
if(gRPC_BUILD_TESTS) if(gRPC_BUILD_TESTS)

@ -5849,6 +5849,24 @@ targets:
- gtest - gtest
- grpc_test_util - grpc_test_util
uses_polling: false uses_polling: false
- name: backend_metrics_lb_policy_test
gtest: true
build: test
language: c++
headers:
- test/cpp/interop/backend_metrics_lb_policy.h
src:
- src/proto/grpc/testing/empty.proto
- src/proto/grpc/testing/messages.proto
- src/proto/grpc/testing/test.proto
- src/cpp/server/orca/orca_service.cc
- test/cpp/interop/backend_metrics_lb_policy.cc
- test/cpp/interop/backend_metrics_lb_policy_test.cc
deps:
- gtest
- grpc++
- grpc_test_util
- grpc++_test_config
- name: backoff_test - name: backoff_test
gtest: true gtest: true
build: test build: test

@ -532,6 +532,22 @@ grpc_cc_library(
], ],
) )
grpc_cc_test(
name = "backend_metrics_lb_policy_test",
srcs = [
"backend_metrics_lb_policy_test.cc",
],
external_deps = ["gtest"],
deps = [
":backend_metrics_lb_policy",
"//:grpc++",
"//:grpcpp_orca_service",
"//src/proto/grpc/testing:test_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_config",
],
)
grpc_cc_library( grpc_cc_library(
name = "rpc_behavior_lb_policy", name = "rpc_behavior_lb_policy",
srcs = [ srcs = [

@ -86,6 +86,11 @@ class BackendMetricsLbPolicy : public LoadBalancingPolicy {
} }
absl::Status UpdateLocked(UpdateArgs args) override { absl::Status UpdateLocked(UpdateArgs args) override {
auto config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
grpc_core::Json::FromArray({grpc_core::Json::FromObject(
{{"pick_first", grpc_core::Json::FromObject({})}})}));
args.config = std::move(config.value());
return delegate_->UpdateLocked(std::move(args)); return delegate_->UpdateLocked(std::move(args));
} }
@ -249,14 +254,12 @@ LoadReportTracker::LoadReportEntry LoadReportTracker::WaitForOobLoadReport(
grpc_core::MutexLock lock(&load_reports_mu_); grpc_core::MutexLock lock(&load_reports_mu_);
// This condition will be called under lock // This condition will be called under lock
for (size_t i = 0; i < max_attempts; i++) { for (size_t i = 0; i < max_attempts; i++) {
auto deadline = absl::Now() + poll_timeout; if (oob_load_reports_.empty()) {
// loop to handle spurious wakeups. load_reports_cv_.WaitWithTimeout(&load_reports_mu_, poll_timeout);
do { if (oob_load_reports_.empty()) {
if (absl::Now() >= deadline) {
return absl::nullopt; return absl::nullopt;
} }
load_reports_cv_.WaitWithDeadline(&load_reports_mu_, deadline); }
} while (oob_load_reports_.empty());
auto report = std::move(oob_load_reports_.front()); auto report = std::move(oob_load_reports_.front());
oob_load_reports_.pop_front(); oob_load_reports_.pop_front();
if (predicate(report)) { if (predicate(report)) {

@ -0,0 +1,159 @@
//
//
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "test/cpp/interop/backend_metrics_lb_policy.h"
#include <memory>
#include <thread>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <grpc/grpc.h>
#include <grpcpp/ext/call_metric_recorder.h>
#include <grpcpp/ext/orca_service.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/support/status.h>
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/proto/grpc/testing/messages.pb.h"
#include "src/proto/grpc/testing/test.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
namespace grpc {
namespace testing {
namespace {
class EchoServiceImpl : public grpc::testing::TestService::CallbackService {
public:
grpc::ServerUnaryReactor* UnaryCall(
grpc::CallbackServerContext* context,
const grpc::testing::SimpleRequest* /* request */,
grpc::testing::SimpleResponse* /* response */) override {
auto reactor = context->DefaultReactor();
reactor->Finish(grpc::Status::OK);
return reactor;
}
};
class Server {
public:
Server() : port_(grpc_pick_unused_port_or_die()) {
server_thread_ = std::thread(ServerLoop, this);
grpc_core::MutexLock lock(&mu_);
cond_.WaitWithTimeout(&mu_, absl::Seconds(1));
}
~Server() {
server_->Shutdown();
server_thread_.join();
}
std::string address() const { return absl::StrCat("localhost:", port_); }
private:
static void ServerLoop(Server* server) { server->Run(); }
void Run() {
ServerBuilder builder;
EchoServiceImpl service;
auto server_metric_recorder =
grpc::experimental::ServerMetricRecorder::Create();
server_metric_recorder->SetCpuUtilization(.5f);
grpc::experimental::OrcaService orca_service(
server_metric_recorder.get(),
grpc::experimental::OrcaService::Options().set_min_report_duration(
absl::Seconds(1)));
builder.RegisterService(&orca_service);
builder.RegisterService(&service);
builder.AddListeningPort(address(), InsecureServerCredentials());
auto grpc_server = builder.BuildAndStart();
server_ = grpc_server.get();
{
grpc_core::MutexLock lock(&mu_);
cond_.SignalAll();
}
grpc_server->Wait();
}
int port_;
grpc_core::Mutex mu_;
grpc_core::CondVar cond_;
std::thread server_thread_;
grpc::Server* server_;
};
TEST(BackendMetricsLbPolicyTest, TestOobMetricsReceipt) {
LoadReportTracker tracker;
grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy);
Server server;
ChannelArguments args = tracker.GetChannelArguments();
args.SetLoadBalancingPolicyName("test_backend_metrics_load_balancer");
auto channel = grpc::CreateCustomChannel(server.address(),
InsecureChannelCredentials(), args);
auto stub = grpc::testing::TestService::Stub(channel);
ClientContext ctx;
SimpleRequest req;
SimpleResponse res;
grpc_core::Mutex mu;
grpc_core::CondVar cond;
absl::optional<Status> status;
stub.async()->UnaryCall(&ctx, &req, &res, [&](auto s) {
grpc_core::MutexLock lock(&mu);
status = s;
cond.SignalAll();
});
// This report is sent on start, available immediately
auto report = tracker.WaitForOobLoadReport(
[](auto report) { return report.cpu_utilization() == 0.5; },
absl::Milliseconds(1500), 3);
ASSERT_TRUE(report.has_value());
EXPECT_EQ(report->cpu_utilization(), 0.5);
for (size_t i = 0; i < 3; i++) {
// Wait for slightly more than 1 min
report = tracker.WaitForOobLoadReport(
[](auto report) { return report.cpu_utilization() == 0.5; },
absl::Milliseconds(1500), 3);
ASSERT_TRUE(report.has_value());
EXPECT_EQ(report->cpu_utilization(), 0.5);
}
{
grpc_core::MutexLock lock(&mu);
if (!status.has_value()) {
cond.Wait(&mu);
}
ASSERT_TRUE(status.has_value());
EXPECT_EQ(status->error_code(), grpc::StatusCode::OK);
}
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
auto result = RUN_ALL_TESTS();
grpc_shutdown();
return result;
}

@ -40,6 +40,7 @@
#include <grpcpp/client_context.h> #include <grpcpp/client_context.h>
#include <grpcpp/security/credentials.h> #include <grpcpp/security/credentials.h>
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/crash.h"
#include "src/proto/grpc/testing/empty.pb.h" #include "src/proto/grpc/testing/empty.pb.h"
@ -1022,8 +1023,13 @@ bool InteropClient::DoOrcaPerRpc() {
bool InteropClient::DoOrcaOob() { bool InteropClient::DoOrcaOob() {
static constexpr auto kTimeout = absl::Seconds(10); static constexpr auto kTimeout = absl::Seconds(10);
gpr_log(GPR_DEBUG, "testing orca oob"); gpr_log(GPR_INFO, "testing orca oob");
load_report_tracker_.ResetCollectedLoadReports(); load_report_tracker_.ResetCollectedLoadReports();
// Make the backup poller poll very frequently in order to pick up
// updates from all the subchannels's FDs.
grpc_core::ConfigVars::Overrides overrides;
overrides.client_channel_backup_poll_interval_ms = 250;
grpc_core::ConfigVars::SetOverrides(overrides);
grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy); grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy);
ClientContext context; ClientContext context;
std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest, std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
@ -1088,7 +1094,7 @@ bool InteropClient::DoOrcaOob() {
kTimeout, 10) kTimeout, 10)
.has_value()); .has_value());
} }
gpr_log(GPR_DEBUG, "orca oob successfully finished"); gpr_log(GPR_INFO, "orca oob successfully finished");
return true; return true;
} }

@ -803,6 +803,30 @@
], ],
"uses_polling": false "uses_polling": false
}, },
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "backend_metrics_lb_policy_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{ {
"args": [], "args": [],
"benchmark": false, "benchmark": false,

Loading…
Cancel
Save