diff --git a/CMakeLists.txt b/CMakeLists.txt index bb91cf8b6d6..d9cab119d93 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14169,6 +14169,7 @@ add_executable(interop_server ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h + src/cpp/server/orca/orca_service.cc test/cpp/interop/interop_server.cc test/cpp/interop/interop_server_bootstrap.cc test/cpp/interop/server_helper.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 744fd06f252..7d93f037654 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -8988,6 +8988,7 @@ targets: - src/proto/grpc/testing/messages.proto - src/proto/grpc/testing/test.proto - src/proto/grpc/testing/xds/v3/orca_load_report.proto + - src/cpp/server/orca/orca_service.cc - test/cpp/interop/interop_server.cc - test/cpp/interop/interop_server_bootstrap.cc - test/cpp/interop/server_helper.cc diff --git a/src/proto/grpc/testing/messages.proto b/src/proto/grpc/testing/messages.proto index ed7f8ead793..921e8bf0d41 100644 --- a/src/proto/grpc/testing/messages.proto +++ b/src/proto/grpc/testing/messages.proto @@ -177,6 +177,9 @@ message StreamingOutputCallRequest { // Whether server should return a given status EchoStatus response_status = 7; + + // ORCA data to be returned out of band + xds.data.orca.v3.OrcaLoadReport orca_oob_report = 8; } // Server-streaming response, as configured by the request and parameters. diff --git a/test/cpp/interop/BUILD b/test/cpp/interop/BUILD index 7b64a4e7ac9..6178cdab986 100644 --- a/test/cpp/interop/BUILD +++ b/test/cpp/interop/BUILD @@ -83,6 +83,7 @@ grpc_cc_library( language = "C++", deps = [ ":server_helper_lib", + "//:grpcpp_orca_service", "//src/proto/grpc/testing:empty_proto", "//src/proto/grpc/testing:messages_proto", "//src/proto/grpc/testing:test_proto", diff --git a/test/cpp/interop/backend_metrics_lb_policy.cc b/test/cpp/interop/backend_metrics_lb_policy.cc index 93eb7caf43b..5a3eb1dbbab 100644 --- a/test/cpp/interop/backend_metrics_lb_policy.cc +++ b/test/cpp/interop/backend_metrics_lb_policy.cc @@ -20,6 +20,9 @@ #include "test/cpp/interop/backend_metrics_lb_policy.h" +#include "absl/strings/str_format.h" + +#include "src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h" #include "src/core/lib/iomgr/pollset_set.h" namespace grpc { @@ -115,6 +118,20 @@ class BackendMetricsLbPolicy : public LoadBalancingPolicy { LoadReportTracker* load_report_tracker_; }; + class OobMetricWatcher : public grpc_core::OobBackendMetricWatcher { + public: + explicit OobMetricWatcher(LoadReportTracker* load_report_tracker) + : load_report_tracker_(load_report_tracker) {} + + private: + void OnBackendMetricReport( + const grpc_core::BackendMetricData& backend_metric_data) override { + load_report_tracker_->RecordOobLoadReport(backend_metric_data); + } + + LoadReportTracker* load_report_tracker_; + }; + class Helper : public ChannelControlHelper { public: explicit Helper(RefCountedPtr parent) @@ -123,8 +140,12 @@ class BackendMetricsLbPolicy : public LoadBalancingPolicy { RefCountedPtr CreateSubchannel( grpc_core::ServerAddress address, const grpc_core::ChannelArgs& args) override { - return parent_->channel_control_helper()->CreateSubchannel( + auto subchannel = parent_->channel_control_helper()->CreateSubchannel( std::move(address), args); + subchannel->AddDataWatcher(MakeOobBackendMetricWatcher( + grpc_core::Duration::Seconds(1), + std::make_unique(parent_->load_report_tracker_))); + return subchannel; } void UpdateState(grpc_connectivity_state state, const absl::Status& status, @@ -216,14 +237,21 @@ void RegisterBackendMetricsLbPolicy(CoreConfiguration::Builder* builder) { void LoadReportTracker::RecordPerRpcLoadReport( const grpc_core::BackendMetricData* backend_metric_data) { - absl::MutexLock lock(&per_rpc_load_reports_mu_); + absl::MutexLock lock(&load_reports_mu_); per_rpc_load_reports_.emplace_back( BackendMetricDataToOrcaLoadReport(backend_metric_data)); } +void LoadReportTracker::RecordOobLoadReport( + const grpc_core::BackendMetricData& oob_metric_data) { + absl::MutexLock lock(&load_reports_mu_); + oob_load_reports_.emplace_back( + *BackendMetricDataToOrcaLoadReport(&oob_metric_data)); +} + absl::optional LoadReportTracker::GetNextLoadReport() { - absl::MutexLock lock(&per_rpc_load_reports_mu_); + absl::MutexLock lock(&load_reports_mu_); if (per_rpc_load_reports_.empty()) { return absl::nullopt; } @@ -232,9 +260,34 @@ LoadReportTracker::GetNextLoadReport() { return report; } +LoadReportTracker::LoadReportEntry LoadReportTracker::WaitForOobLoadReport( + const std::function& + predicate, + absl::Duration poll_timeout, size_t max_attempts) { + absl::MutexLock lock(&load_reports_mu_); + // This condition will be called under lock + auto condition = [&]() ABSL_NO_THREAD_SAFETY_ANALYSIS { + return !oob_load_reports_.empty(); + }; + for (size_t i = 0; i < max_attempts; i++) { + if (!load_reports_mu_.AwaitWithTimeout(absl::Condition(&condition), + poll_timeout)) { + return absl::nullopt; + } + auto report = std::move(oob_load_reports_.front()); + oob_load_reports_.pop_front(); + if (predicate(report)) { + gpr_log(GPR_DEBUG, "Report #%" PRIuPTR " matched", i + 1); + return report; + } + } + return absl::nullopt; +} + void LoadReportTracker::ResetCollectedLoadReports() { - absl::MutexLock lock(&per_rpc_load_reports_mu_); + absl::MutexLock lock(&load_reports_mu_); per_rpc_load_reports_.clear(); + oob_load_reports_.clear(); } ChannelArguments LoadReportTracker::GetChannelArguments() { diff --git a/test/cpp/interop/backend_metrics_lb_policy.h b/test/cpp/interop/backend_metrics_lb_policy.h index 9853f3279a1..cf0f1618b67 100644 --- a/test/cpp/interop/backend_metrics_lb_policy.h +++ b/test/cpp/interop/backend_metrics_lb_policy.h @@ -37,13 +37,20 @@ class LoadReportTracker { void ResetCollectedLoadReports(); void RecordPerRpcLoadReport( const grpc_core::BackendMetricData* backend_metric_data); - // Returns the next load report, or nullopt if the queue is empty. + void RecordOobLoadReport(const grpc_core::BackendMetricData& oob_metric_data); + // Returns the next per-RPC load report, or nullopt if the queue is empty. absl::optional GetNextLoadReport(); + LoadReportEntry WaitForOobLoadReport( + const std::function& + predicate, + absl::Duration poll_timeout, size_t max_attempts); private: std::deque per_rpc_load_reports_ - ABSL_GUARDED_BY(per_rpc_load_reports_mu_); - absl::Mutex per_rpc_load_reports_mu_; + ABSL_GUARDED_BY(load_reports_mu_); + std::deque oob_load_reports_ + ABSL_GUARDED_BY(load_reports_mu_); + absl::Mutex load_reports_mu_; }; void RegisterBackendMetricsLbPolicy( diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 874e7bc7b7a..cf3ada95b03 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -70,6 +70,7 @@ ABSL_FLAG( //"long_lived_channel: sends large_unary rpcs over a long-lived channel;\n" //"oauth2_auth_token: raw oauth2 access token auth;\n" //"orca_per_rpc: custom LB policy receives per-query metric reports;\n" + //"orca_oob: receives out-of-band metric reports from the backend;\n" //"per_rpc_creds: raw oauth2 access token on a single rpc;\n" //"ping_pong : full-duplex streaming;\n" //"response streaming;\n" @@ -271,6 +272,8 @@ int main(int argc, char** argv) { std::bind(&grpc::testing::InteropClient::DoPickFirstUnary, &client); actions["orca_per_rpc"] = std::bind(&grpc::testing::InteropClient::DoOrcaPerRpc, &client); + actions["orca_oob"] = + std::bind(&grpc::testing::InteropClient::DoOrcaOob, &client); if (absl::GetFlag(FLAGS_use_tls)) { actions["compute_engine_creds"] = std::bind(&grpc::testing::InteropClient::DoComputeEngineCreds, &client, diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 87721d3e253..a720e77e03a 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -25,8 +25,10 @@ #include #include +#include "absl/cleanup/cleanup.h" #include "absl/strings/match.h" #include "absl/strings/str_format.h" +#include "absl/strings/str_join.h" #include #include @@ -98,38 +100,61 @@ void InitializeCustomLbPolicyIfNeeded() { } } +absl::optional ValuesDiff(absl::string_view field, double expected, + double actual) { + if (expected != actual) { + return absl::StrFormat("%s: expected: %f, actual: %f", field, expected, + actual); + } + return absl::nullopt; +} + template -bool SameMaps(const std::string& path, const Map& expected, const Map& actual) { - if (expected.size() != actual.size()) { - gpr_log(GPR_ERROR, "Field %s does not match: expected %lu entries, got %lu", - path.c_str(), expected.size(), actual.size()); - return false; +absl::optional MapsDiff(absl::string_view path, + const Map& expected, const Map& actual) { + auto result = ValuesDiff(absl::StrFormat("%s size", path), expected.size(), + actual.size()); + if (result.has_value()) { + return result; } for (const auto& key_value : expected) { auto it = actual.find(key_value.first); if (it == actual.end()) { - gpr_log(GPR_ERROR, "In field %s, key %s was not found", path.c_str(), - key_value.first.c_str()); - return false; + return absl::StrFormat("In field %s, key %s was not found", path, + key_value.first); } - if (key_value.second != it->second) { - gpr_log( - GPR_ERROR, "In field %s, value %s mismatch: expected %f, actual: %f", - path.c_str(), key_value.first.c_str(), key_value.second, it->second); - return false; + result = ValuesDiff(absl::StrFormat("%s/%s", path, key_value.first), + key_value.second, it->second); + if (result.has_value()) { + return result; } } - return true; + return absl::nullopt; } -void SameOrcaLoadReports(const xds::data::orca::v3::OrcaLoadReport& expected, - const xds::data::orca::v3::OrcaLoadReport& actual) { - GPR_ASSERT(expected.cpu_utilization() == actual.cpu_utilization()); - GPR_ASSERT(expected.mem_utilization() == actual.mem_utilization()); - GPR_ASSERT( - SameMaps("request_cost", expected.request_cost(), actual.request_cost())); - GPR_ASSERT( - SameMaps("utilization", expected.utilization(), actual.utilization())); +absl::optional OrcaLoadReportsDiff( + const xds::data::orca::v3::OrcaLoadReport& expected, + const xds::data::orca::v3::OrcaLoadReport& actual) { + auto error = ValuesDiff("cpu_utilization", expected.cpu_utilization(), + actual.cpu_utilization()); + if (error.has_value()) { + return error; + } + error = ValuesDiff("mem_utilization", expected.mem_utilization(), + actual.mem_utilization()); + if (error.has_value()) { + return error; + } + error = + MapsDiff("request_cost", expected.request_cost(), actual.request_cost()); + if (error.has_value()) { + return error; + } + error = MapsDiff("utilization", expected.utilization(), actual.utilization()); + if (error.has_value()) { + return error; + } + return absl::nullopt; } } // namespace @@ -1006,12 +1031,81 @@ bool InteropClient::DoOrcaPerRpc() { auto report = load_report_tracker_.GetNextLoadReport(); GPR_ASSERT(report.has_value()); GPR_ASSERT(report->has_value()); - SameOrcaLoadReports(report->value(), *orca_report); + auto comparison_result = OrcaLoadReportsDiff(report->value(), *orca_report); + if (comparison_result.has_value()) { + gpr_assertion_failed(__FILE__, __LINE__, comparison_result->c_str()); + } GPR_ASSERT(!load_report_tracker_.GetNextLoadReport().has_value()); gpr_log(GPR_DEBUG, "orca per rpc successfully finished"); return true; } +bool InteropClient::DoOrcaOob() { + gpr_log(GPR_DEBUG, "testing orca oob"); + ClientContext context; + std::unique_ptr> + stream(serviceStub_.Get()->FullDuplexCall(&context)); + auto stream_cleanup = absl::MakeCleanup([&]() { + GPR_ASSERT(stream->WritesDone()); + GPR_ASSERT(stream->Finish().ok()); + }); + { + StreamingOutputCallRequest request; + request.add_response_parameters()->set_size(1); + xds::data::orca::v3::OrcaLoadReport* orca_report = + request.mutable_orca_oob_report(); + orca_report->set_cpu_utilization(0.8210); + orca_report->set_mem_utilization(0.5847); + orca_report->mutable_utilization()->emplace("util", 0.30499); + StreamingOutputCallResponse response; + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Write() failed"); + return TransientFailureOrAbort(); + } + if (!stream->Read(&response)) { + gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Read failed"); + return TransientFailureOrAbort(); + } + GPR_ASSERT( + load_report_tracker_ + .WaitForOobLoadReport( + [orca_report](const auto& actual) { + return !OrcaLoadReportsDiff(*orca_report, actual).has_value(); + }, + absl::Seconds(5), 10) + .has_value()); + } + { + StreamingOutputCallRequest request; + request.add_response_parameters()->set_size(1); + xds::data::orca::v3::OrcaLoadReport* orca_report = + request.mutable_orca_oob_report(); + orca_report->set_cpu_utilization(0.29309); + orca_report->set_mem_utilization(0.2); + orca_report->mutable_utilization()->emplace("util", 0.2039); + StreamingOutputCallResponse response; + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Write() failed"); + return TransientFailureOrAbort(); + } + if (!stream->Read(&response)) { + gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Read failed"); + return TransientFailureOrAbort(); + } + GPR_ASSERT( + load_report_tracker_ + .WaitForOobLoadReport( + [orca_report](const auto& report) { + return !OrcaLoadReportsDiff(*orca_report, report).has_value(); + }, + absl::Seconds(5), 10) + .has_value()); + } + gpr_log(GPR_DEBUG, "orca oob successfully finished"); + return true; +} + bool InteropClient::DoCustomMetadata() { const std::string kEchoInitialMetadataKey("x-grpc-test-echo-initial"); const std::string kInitialMetadataValue("test_initial_metadata_value"); diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 4fc7d79dd82..37915b62082 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -78,6 +78,7 @@ class InteropClient { // all requests are sent to one server despite multiple servers are resolved bool DoPickFirstUnary(); bool DoOrcaPerRpc(); + bool DoOrcaOob(); // The following interop test are not yet part of the interop spec, and are // not implemented cross-language. They are considered experimental for now, diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc index 7383c68d7c4..af4e51d8f9d 100644 --- a/test/cpp/interop/interop_server.cc +++ b/test/cpp/interop/interop_server.cc @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -139,8 +140,9 @@ bool CheckExpectedCompression(const ServerContext& context, return true; } -void RecordMetrics(ServerContext* context, - const xds::data::orca::v3::OrcaLoadReport& request_metrics) { +void RecordCallMetrics( + ServerContext* context, + const xds::data::orca::v3::OrcaLoadReport& request_metrics) { auto recorder = context->ExperimentalGetCallMetricRecorder(); // Do not record when zero since it indicates no test per-call report. if (request_metrics.cpu_utilization() > 0) { @@ -170,6 +172,10 @@ void RecordMetrics(ServerContext* context, class TestServiceImpl : public TestService::Service { public: + explicit TestServiceImpl( + grpc::experimental::ServerMetricRecorder* server_metric_recorder) + : server_metric_recorder_(server_metric_recorder) {} + Status EmptyCall(ServerContext* context, const grpc::testing::Empty* /*request*/, grpc::testing::Empty* /*response*/) override { @@ -220,7 +226,7 @@ class TestServiceImpl : public TestService::Service { request->response_status().message()); } if (request->has_orca_per_rpc_report()) { - RecordMetrics(context, request->orca_per_rpc_report()); + RecordCallMetrics(context, request->orca_per_rpc_report()); } return Status::OK; } @@ -313,6 +319,9 @@ class TestServiceImpl : public TestService::Service { } write_success = stream->Write(response); } + if (request.has_orca_oob_report()) { + RecordServerMetrics(request.orca_oob_report()); + } } if (write_success) { return Status::OK; @@ -349,6 +358,35 @@ class TestServiceImpl : public TestService::Service { return Status(grpc::StatusCode::INTERNAL, "Error writing response."); } } + + private: + void RecordServerMetrics( + const xds::data::orca::v3::OrcaLoadReport& request_metrics) { + // Do not record when zero since it indicates no test per-call report. + if (request_metrics.cpu_utilization() > 0) { + server_metric_recorder_->SetCpuUtilization( + request_metrics.cpu_utilization()); + } + if (request_metrics.mem_utilization() > 0) { + server_metric_recorder_->SetMemoryUtilization( + request_metrics.mem_utilization()); + } + if (request_metrics.rps_fractional() > 0) { + server_metric_recorder_->SetQps(request_metrics.rps_fractional()); + } + absl::MutexLock lock(&retained_utilization_names_mu_); + std::map named_utilizations; + for (const auto& p : request_metrics.utilization()) { + const auto& key = *retained_utilization_names_.insert(p.first).first; + named_utilizations.emplace(key, p.second); + } + server_metric_recorder_->SetAllNamedUtilization(named_utilizations); + } + + grpc::experimental::ServerMetricRecorder* server_metric_recorder_; + std::set retained_utilization_names_ + ABSL_GUARDED_BY(retained_utilization_names_mu_); + absl::Mutex retained_utilization_names_mu_; }; void grpc::testing::interop::RunServer( @@ -378,13 +416,16 @@ void grpc::testing::interop::RunServer( GPR_ASSERT(port != 0); std::ostringstream server_address; server_address << "0.0.0.0:" << port; - TestServiceImpl service; - - SimpleRequest request; - SimpleResponse response; - + auto server_metric_recorder = + grpc::experimental::ServerMetricRecorder::Create(); + TestServiceImpl service(server_metric_recorder.get()); + grpc::experimental::OrcaService orca_service( + server_metric_recorder.get(), + experimental::OrcaService::Options().set_min_report_duration( + absl::Seconds(0.1))); ServerBuilder builder; builder.RegisterService(&service); + builder.RegisterService(&orca_service); builder.AddListeningPort(server_address.str(), creds); if (server_options != nullptr) { for (size_t i = 0; i < server_options->size(); i++) { @@ -395,7 +436,7 @@ void grpc::testing::interop::RunServer( builder.SetMaxSendMessageSize(absl::GetFlag(FLAGS_max_send_message_size)); } grpc::ServerBuilder::experimental_type(&builder).EnableCallMetricRecording( - nullptr); + server_metric_recorder.get()); std::unique_ptr server(builder.BuildAndStart()); gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());