[testing]: Add "orca_oob" test case (#32599)

pull/32623/head
Eugene Ostroukhov 2 years ago committed by GitHub
parent 555f3e26a4
commit acec3a6975
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CMakeLists.txt
  2. 1
      build_autogenerated.yaml
  3. 3
      src/proto/grpc/testing/messages.proto
  4. 1
      test/cpp/interop/BUILD
  5. 61
      test/cpp/interop/backend_metrics_lb_policy.cc
  6. 13
      test/cpp/interop/backend_metrics_lb_policy.h
  7. 3
      test/cpp/interop/client.cc
  8. 140
      test/cpp/interop/interop_client.cc
  9. 1
      test/cpp/interop/interop_client.h
  10. 59
      test/cpp/interop/interop_server.cc

1
CMakeLists.txt generated

@ -14169,6 +14169,7 @@ add_executable(interop_server
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h
src/cpp/server/orca/orca_service.cc
test/cpp/interop/interop_server.cc
test/cpp/interop/interop_server_bootstrap.cc
test/cpp/interop/server_helper.cc

@ -8988,6 +8988,7 @@ targets:
- src/proto/grpc/testing/messages.proto
- src/proto/grpc/testing/test.proto
- src/proto/grpc/testing/xds/v3/orca_load_report.proto
- src/cpp/server/orca/orca_service.cc
- test/cpp/interop/interop_server.cc
- test/cpp/interop/interop_server_bootstrap.cc
- test/cpp/interop/server_helper.cc

@ -177,6 +177,9 @@ message StreamingOutputCallRequest {
// Whether server should return a given status
EchoStatus response_status = 7;
// ORCA data to be returned out of band
xds.data.orca.v3.OrcaLoadReport orca_oob_report = 8;
}
// Server-streaming response, as configured by the request and parameters.

@ -83,6 +83,7 @@ grpc_cc_library(
language = "C++",
deps = [
":server_helper_lib",
"//:grpcpp_orca_service",
"//src/proto/grpc/testing:empty_proto",
"//src/proto/grpc/testing:messages_proto",
"//src/proto/grpc/testing:test_proto",

@ -20,6 +20,9 @@
#include "test/cpp/interop/backend_metrics_lb_policy.h"
#include "absl/strings/str_format.h"
#include "src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h"
#include "src/core/lib/iomgr/pollset_set.h"
namespace grpc {
@ -115,6 +118,20 @@ class BackendMetricsLbPolicy : public LoadBalancingPolicy {
LoadReportTracker* load_report_tracker_;
};
class OobMetricWatcher : public grpc_core::OobBackendMetricWatcher {
public:
explicit OobMetricWatcher(LoadReportTracker* load_report_tracker)
: load_report_tracker_(load_report_tracker) {}
private:
void OnBackendMetricReport(
const grpc_core::BackendMetricData& backend_metric_data) override {
load_report_tracker_->RecordOobLoadReport(backend_metric_data);
}
LoadReportTracker* load_report_tracker_;
};
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<BackendMetricsLbPolicy> parent)
@ -123,8 +140,12 @@ class BackendMetricsLbPolicy : public LoadBalancingPolicy {
RefCountedPtr<grpc_core::SubchannelInterface> CreateSubchannel(
grpc_core::ServerAddress address,
const grpc_core::ChannelArgs& args) override {
return parent_->channel_control_helper()->CreateSubchannel(
auto subchannel = parent_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
subchannel->AddDataWatcher(MakeOobBackendMetricWatcher(
grpc_core::Duration::Seconds(1),
std::make_unique<OobMetricWatcher>(parent_->load_report_tracker_)));
return subchannel;
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
@ -216,14 +237,21 @@ void RegisterBackendMetricsLbPolicy(CoreConfiguration::Builder* builder) {
void LoadReportTracker::RecordPerRpcLoadReport(
const grpc_core::BackendMetricData* backend_metric_data) {
absl::MutexLock lock(&per_rpc_load_reports_mu_);
absl::MutexLock lock(&load_reports_mu_);
per_rpc_load_reports_.emplace_back(
BackendMetricDataToOrcaLoadReport(backend_metric_data));
}
void LoadReportTracker::RecordOobLoadReport(
const grpc_core::BackendMetricData& oob_metric_data) {
absl::MutexLock lock(&load_reports_mu_);
oob_load_reports_.emplace_back(
*BackendMetricDataToOrcaLoadReport(&oob_metric_data));
}
absl::optional<LoadReportTracker::LoadReportEntry>
LoadReportTracker::GetNextLoadReport() {
absl::MutexLock lock(&per_rpc_load_reports_mu_);
absl::MutexLock lock(&load_reports_mu_);
if (per_rpc_load_reports_.empty()) {
return absl::nullopt;
}
@ -232,9 +260,34 @@ LoadReportTracker::GetNextLoadReport() {
return report;
}
LoadReportTracker::LoadReportEntry LoadReportTracker::WaitForOobLoadReport(
const std::function<bool(const xds::data::orca::v3::OrcaLoadReport&)>&
predicate,
absl::Duration poll_timeout, size_t max_attempts) {
absl::MutexLock lock(&load_reports_mu_);
// This condition will be called under lock
auto condition = [&]() ABSL_NO_THREAD_SAFETY_ANALYSIS {
return !oob_load_reports_.empty();
};
for (size_t i = 0; i < max_attempts; i++) {
if (!load_reports_mu_.AwaitWithTimeout(absl::Condition(&condition),
poll_timeout)) {
return absl::nullopt;
}
auto report = std::move(oob_load_reports_.front());
oob_load_reports_.pop_front();
if (predicate(report)) {
gpr_log(GPR_DEBUG, "Report #%" PRIuPTR " matched", i + 1);
return report;
}
}
return absl::nullopt;
}
void LoadReportTracker::ResetCollectedLoadReports() {
absl::MutexLock lock(&per_rpc_load_reports_mu_);
absl::MutexLock lock(&load_reports_mu_);
per_rpc_load_reports_.clear();
oob_load_reports_.clear();
}
ChannelArguments LoadReportTracker::GetChannelArguments() {

@ -37,13 +37,20 @@ class LoadReportTracker {
void ResetCollectedLoadReports();
void RecordPerRpcLoadReport(
const grpc_core::BackendMetricData* backend_metric_data);
// Returns the next load report, or nullopt if the queue is empty.
void RecordOobLoadReport(const grpc_core::BackendMetricData& oob_metric_data);
// Returns the next per-RPC load report, or nullopt if the queue is empty.
absl::optional<LoadReportEntry> GetNextLoadReport();
LoadReportEntry WaitForOobLoadReport(
const std::function<bool(const xds::data::orca::v3::OrcaLoadReport&)>&
predicate,
absl::Duration poll_timeout, size_t max_attempts);
private:
std::deque<LoadReportEntry> per_rpc_load_reports_
ABSL_GUARDED_BY(per_rpc_load_reports_mu_);
absl::Mutex per_rpc_load_reports_mu_;
ABSL_GUARDED_BY(load_reports_mu_);
std::deque<xds::data::orca::v3::OrcaLoadReport> oob_load_reports_
ABSL_GUARDED_BY(load_reports_mu_);
absl::Mutex load_reports_mu_;
};
void RegisterBackendMetricsLbPolicy(

@ -70,6 +70,7 @@ ABSL_FLAG(
//"long_lived_channel: sends large_unary rpcs over a long-lived channel;\n"
//"oauth2_auth_token: raw oauth2 access token auth;\n"
//"orca_per_rpc: custom LB policy receives per-query metric reports;\n"
//"orca_oob: receives out-of-band metric reports from the backend;\n"
//"per_rpc_creds: raw oauth2 access token on a single rpc;\n"
//"ping_pong : full-duplex streaming;\n"
//"response streaming;\n"
@ -271,6 +272,8 @@ int main(int argc, char** argv) {
std::bind(&grpc::testing::InteropClient::DoPickFirstUnary, &client);
actions["orca_per_rpc"] =
std::bind(&grpc::testing::InteropClient::DoOrcaPerRpc, &client);
actions["orca_oob"] =
std::bind(&grpc::testing::InteropClient::DoOrcaOob, &client);
if (absl::GetFlag(FLAGS_use_tls)) {
actions["compute_engine_creds"] =
std::bind(&grpc::testing::InteropClient::DoComputeEngineCreds, &client,

@ -25,8 +25,10 @@
#include <type_traits>
#include <utility>
#include "absl/cleanup/cleanup.h"
#include "absl/strings/match.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@ -98,38 +100,61 @@ void InitializeCustomLbPolicyIfNeeded() {
}
}
absl::optional<std::string> ValuesDiff(absl::string_view field, double expected,
double actual) {
if (expected != actual) {
return absl::StrFormat("%s: expected: %f, actual: %f", field, expected,
actual);
}
return absl::nullopt;
}
template <typename Map>
bool SameMaps(const std::string& path, const Map& expected, const Map& actual) {
if (expected.size() != actual.size()) {
gpr_log(GPR_ERROR, "Field %s does not match: expected %lu entries, got %lu",
path.c_str(), expected.size(), actual.size());
return false;
absl::optional<std::string> MapsDiff(absl::string_view path,
const Map& expected, const Map& actual) {
auto result = ValuesDiff(absl::StrFormat("%s size", path), expected.size(),
actual.size());
if (result.has_value()) {
return result;
}
for (const auto& key_value : expected) {
auto it = actual.find(key_value.first);
if (it == actual.end()) {
gpr_log(GPR_ERROR, "In field %s, key %s was not found", path.c_str(),
key_value.first.c_str());
return false;
return absl::StrFormat("In field %s, key %s was not found", path,
key_value.first);
}
if (key_value.second != it->second) {
gpr_log(
GPR_ERROR, "In field %s, value %s mismatch: expected %f, actual: %f",
path.c_str(), key_value.first.c_str(), key_value.second, it->second);
return false;
result = ValuesDiff(absl::StrFormat("%s/%s", path, key_value.first),
key_value.second, it->second);
if (result.has_value()) {
return result;
}
}
return true;
return absl::nullopt;
}
void SameOrcaLoadReports(const xds::data::orca::v3::OrcaLoadReport& expected,
const xds::data::orca::v3::OrcaLoadReport& actual) {
GPR_ASSERT(expected.cpu_utilization() == actual.cpu_utilization());
GPR_ASSERT(expected.mem_utilization() == actual.mem_utilization());
GPR_ASSERT(
SameMaps("request_cost", expected.request_cost(), actual.request_cost()));
GPR_ASSERT(
SameMaps("utilization", expected.utilization(), actual.utilization()));
absl::optional<std::string> OrcaLoadReportsDiff(
const xds::data::orca::v3::OrcaLoadReport& expected,
const xds::data::orca::v3::OrcaLoadReport& actual) {
auto error = ValuesDiff("cpu_utilization", expected.cpu_utilization(),
actual.cpu_utilization());
if (error.has_value()) {
return error;
}
error = ValuesDiff("mem_utilization", expected.mem_utilization(),
actual.mem_utilization());
if (error.has_value()) {
return error;
}
error =
MapsDiff("request_cost", expected.request_cost(), actual.request_cost());
if (error.has_value()) {
return error;
}
error = MapsDiff("utilization", expected.utilization(), actual.utilization());
if (error.has_value()) {
return error;
}
return absl::nullopt;
}
} // namespace
@ -1006,12 +1031,81 @@ bool InteropClient::DoOrcaPerRpc() {
auto report = load_report_tracker_.GetNextLoadReport();
GPR_ASSERT(report.has_value());
GPR_ASSERT(report->has_value());
SameOrcaLoadReports(report->value(), *orca_report);
auto comparison_result = OrcaLoadReportsDiff(report->value(), *orca_report);
if (comparison_result.has_value()) {
gpr_assertion_failed(__FILE__, __LINE__, comparison_result->c_str());
}
GPR_ASSERT(!load_report_tracker_.GetNextLoadReport().has_value());
gpr_log(GPR_DEBUG, "orca per rpc successfully finished");
return true;
}
bool InteropClient::DoOrcaOob() {
gpr_log(GPR_DEBUG, "testing orca oob");
ClientContext context;
std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
StreamingOutputCallResponse>>
stream(serviceStub_.Get()->FullDuplexCall(&context));
auto stream_cleanup = absl::MakeCleanup([&]() {
GPR_ASSERT(stream->WritesDone());
GPR_ASSERT(stream->Finish().ok());
});
{
StreamingOutputCallRequest request;
request.add_response_parameters()->set_size(1);
xds::data::orca::v3::OrcaLoadReport* orca_report =
request.mutable_orca_oob_report();
orca_report->set_cpu_utilization(0.8210);
orca_report->set_mem_utilization(0.5847);
orca_report->mutable_utilization()->emplace("util", 0.30499);
StreamingOutputCallResponse response;
if (!stream->Write(request)) {
gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Write() failed");
return TransientFailureOrAbort();
}
if (!stream->Read(&response)) {
gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Read failed");
return TransientFailureOrAbort();
}
GPR_ASSERT(
load_report_tracker_
.WaitForOobLoadReport(
[orca_report](const auto& actual) {
return !OrcaLoadReportsDiff(*orca_report, actual).has_value();
},
absl::Seconds(5), 10)
.has_value());
}
{
StreamingOutputCallRequest request;
request.add_response_parameters()->set_size(1);
xds::data::orca::v3::OrcaLoadReport* orca_report =
request.mutable_orca_oob_report();
orca_report->set_cpu_utilization(0.29309);
orca_report->set_mem_utilization(0.2);
orca_report->mutable_utilization()->emplace("util", 0.2039);
StreamingOutputCallResponse response;
if (!stream->Write(request)) {
gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Write() failed");
return TransientFailureOrAbort();
}
if (!stream->Read(&response)) {
gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Read failed");
return TransientFailureOrAbort();
}
GPR_ASSERT(
load_report_tracker_
.WaitForOobLoadReport(
[orca_report](const auto& report) {
return !OrcaLoadReportsDiff(*orca_report, report).has_value();
},
absl::Seconds(5), 10)
.has_value());
}
gpr_log(GPR_DEBUG, "orca oob successfully finished");
return true;
}
bool InteropClient::DoCustomMetadata() {
const std::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
const std::string kInitialMetadataValue("test_initial_metadata_value");

@ -78,6 +78,7 @@ class InteropClient {
// all requests are sent to one server despite multiple servers are resolved
bool DoPickFirstUnary();
bool DoOrcaPerRpc();
bool DoOrcaOob();
// The following interop test are not yet part of the interop spec, and are
// not implemented cross-language. They are considered experimental for now,

@ -27,6 +27,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpcpp/ext/call_metric_recorder.h>
#include <grpcpp/ext/orca_service.h>
#include <grpcpp/ext/server_metric_recorder.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
@ -139,8 +140,9 @@ bool CheckExpectedCompression(const ServerContext& context,
return true;
}
void RecordMetrics(ServerContext* context,
const xds::data::orca::v3::OrcaLoadReport& request_metrics) {
void RecordCallMetrics(
ServerContext* context,
const xds::data::orca::v3::OrcaLoadReport& request_metrics) {
auto recorder = context->ExperimentalGetCallMetricRecorder();
// Do not record when zero since it indicates no test per-call report.
if (request_metrics.cpu_utilization() > 0) {
@ -170,6 +172,10 @@ void RecordMetrics(ServerContext* context,
class TestServiceImpl : public TestService::Service {
public:
explicit TestServiceImpl(
grpc::experimental::ServerMetricRecorder* server_metric_recorder)
: server_metric_recorder_(server_metric_recorder) {}
Status EmptyCall(ServerContext* context,
const grpc::testing::Empty* /*request*/,
grpc::testing::Empty* /*response*/) override {
@ -220,7 +226,7 @@ class TestServiceImpl : public TestService::Service {
request->response_status().message());
}
if (request->has_orca_per_rpc_report()) {
RecordMetrics(context, request->orca_per_rpc_report());
RecordCallMetrics(context, request->orca_per_rpc_report());
}
return Status::OK;
}
@ -313,6 +319,9 @@ class TestServiceImpl : public TestService::Service {
}
write_success = stream->Write(response);
}
if (request.has_orca_oob_report()) {
RecordServerMetrics(request.orca_oob_report());
}
}
if (write_success) {
return Status::OK;
@ -349,6 +358,35 @@ class TestServiceImpl : public TestService::Service {
return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
}
}
private:
void RecordServerMetrics(
const xds::data::orca::v3::OrcaLoadReport& request_metrics) {
// Do not record when zero since it indicates no test per-call report.
if (request_metrics.cpu_utilization() > 0) {
server_metric_recorder_->SetCpuUtilization(
request_metrics.cpu_utilization());
}
if (request_metrics.mem_utilization() > 0) {
server_metric_recorder_->SetMemoryUtilization(
request_metrics.mem_utilization());
}
if (request_metrics.rps_fractional() > 0) {
server_metric_recorder_->SetQps(request_metrics.rps_fractional());
}
absl::MutexLock lock(&retained_utilization_names_mu_);
std::map<grpc::string_ref, double> named_utilizations;
for (const auto& p : request_metrics.utilization()) {
const auto& key = *retained_utilization_names_.insert(p.first).first;
named_utilizations.emplace(key, p.second);
}
server_metric_recorder_->SetAllNamedUtilization(named_utilizations);
}
grpc::experimental::ServerMetricRecorder* server_metric_recorder_;
std::set<std::string> retained_utilization_names_
ABSL_GUARDED_BY(retained_utilization_names_mu_);
absl::Mutex retained_utilization_names_mu_;
};
void grpc::testing::interop::RunServer(
@ -378,13 +416,16 @@ void grpc::testing::interop::RunServer(
GPR_ASSERT(port != 0);
std::ostringstream server_address;
server_address << "0.0.0.0:" << port;
TestServiceImpl service;
SimpleRequest request;
SimpleResponse response;
auto server_metric_recorder =
grpc::experimental::ServerMetricRecorder::Create();
TestServiceImpl service(server_metric_recorder.get());
grpc::experimental::OrcaService orca_service(
server_metric_recorder.get(),
experimental::OrcaService::Options().set_min_report_duration(
absl::Seconds(0.1)));
ServerBuilder builder;
builder.RegisterService(&service);
builder.RegisterService(&orca_service);
builder.AddListeningPort(server_address.str(), creds);
if (server_options != nullptr) {
for (size_t i = 0; i < server_options->size(); i++) {
@ -395,7 +436,7 @@ void grpc::testing::interop::RunServer(
builder.SetMaxSendMessageSize(absl::GetFlag(FLAGS_max_send_message_size));
}
grpc::ServerBuilder::experimental_type(&builder).EnableCallMetricRecording(
nullptr);
server_metric_recorder.get());
std::unique_ptr<Server> server(builder.BuildAndStart());
gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());

Loading…
Cancel
Save