mirror of https://github.com/grpc/grpc.git
This reverts commit 6d6380de58
.
pull/29352/head
parent
0b79940496
commit
71b355624f
16 changed files with 0 additions and 1203 deletions
@ -1,83 +0,0 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPCPP_EXT_ORCA_SERVICE_H |
||||
#define GRPCPP_EXT_ORCA_SERVICE_H |
||||
|
||||
#include <map> |
||||
#include <string> |
||||
|
||||
#include "absl/time/time.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpcpp/impl/codegen/server_callback.h> |
||||
#include <grpcpp/impl/codegen/service_type.h> |
||||
#include <grpcpp/impl/codegen/sync.h> |
||||
#include <grpcpp/server_builder.h> |
||||
#include <grpcpp/support/slice.h> |
||||
|
||||
namespace grpc { |
||||
namespace experimental { |
||||
|
||||
// RPC service implementation for supplying out-of-band backend
|
||||
// utilization metrics to clients.
|
||||
class OrcaService : public Service { |
||||
public: |
||||
struct Options { |
||||
// Minimum report interval. If a client requests an interval lower
|
||||
// than this value, this value will be used instead.
|
||||
absl::Duration min_report_duration = absl::Seconds(30); |
||||
|
||||
Options() = default; |
||||
Options& set_min_report_duration(absl::Duration duration) { |
||||
min_report_duration = duration; |
||||
return *this; |
||||
} |
||||
}; |
||||
|
||||
explicit OrcaService(Options options); |
||||
|
||||
// Sets or removes the CPU utilization value to be reported to clients.
|
||||
void SetCpuUtilization(double cpu_utilization); |
||||
void DeleteCpuUtilization(); |
||||
|
||||
// Sets of removes the memory utilization value to be reported to clients.
|
||||
void SetMemoryUtilization(double memory_utilization); |
||||
void DeleteMemoryUtilization(); |
||||
|
||||
// Sets or removed named utilization values to be reported to clients.
|
||||
void SetNamedUtilization(std::string name, double utilization); |
||||
void DeleteNamedUtilization(const std::string& name); |
||||
void SetAllNamedUtilization(std::map<std::string, double> named_utilization); |
||||
|
||||
private: |
||||
class Reactor; |
||||
|
||||
Slice GetOrCreateSerializedResponse(); |
||||
|
||||
const absl::Duration min_report_duration_; |
||||
|
||||
grpc::internal::Mutex mu_; |
||||
double cpu_utilization_ ABSL_GUARDED_BY(&mu_) = -1; |
||||
double memory_utilization_ ABSL_GUARDED_BY(&mu_) = -1; |
||||
std::map<std::string, double> named_utilization_ ABSL_GUARDED_BY(&mu_); |
||||
absl::optional<Slice> response_slice_ ABSL_GUARDED_BY(&mu_); |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPCPP_EXT_ORCA_SERVICE_H
|
@ -1,47 +0,0 @@ |
||||
/* This file was generated by upbc (the upb compiler) from the input
|
||||
* file: |
||||
* |
||||
* xds/service/orca/v3/orca.proto |
||||
* |
||||
* Do not edit -- your changes will be discarded when the file is |
||||
* regenerated. */ |
||||
|
||||
#include <stddef.h> |
||||
#include "upb/msg_internal.h" |
||||
#include "xds/service/orca/v3/orca.upb.h" |
||||
#include "xds/data/orca/v3/orca_load_report.upb.h" |
||||
#include "google/protobuf/duration.upb.h" |
||||
#include "validate/validate.upb.h" |
||||
|
||||
#include "upb/port_def.inc" |
||||
|
||||
static const upb_MiniTable_Sub xds_service_orca_v3_OrcaLoadReportRequest_submsgs[1] = { |
||||
{.submsg = &google_protobuf_Duration_msginit}, |
||||
}; |
||||
|
||||
static const upb_MiniTable_Field xds_service_orca_v3_OrcaLoadReportRequest__fields[2] = { |
||||
{1, UPB_SIZE(4, 8), UPB_SIZE(1, 1), 0, 11, kUpb_FieldMode_Scalar | (kUpb_FieldRep_Pointer << kUpb_FieldRep_Shift)}, |
||||
{2, UPB_SIZE(8, 16), UPB_SIZE(0, 0), kUpb_NoSub, 9, kUpb_FieldMode_Array | (kUpb_FieldRep_Pointer << kUpb_FieldRep_Shift)}, |
||||
}; |
||||
|
||||
const upb_MiniTable xds_service_orca_v3_OrcaLoadReportRequest_msginit = { |
||||
&xds_service_orca_v3_OrcaLoadReportRequest_submsgs[0], |
||||
&xds_service_orca_v3_OrcaLoadReportRequest__fields[0], |
||||
UPB_SIZE(12, 24), 2, kUpb_ExtMode_NonExtendable, 2, 255, 0, |
||||
}; |
||||
|
||||
static const upb_MiniTable *messages_layout[1] = { |
||||
&xds_service_orca_v3_OrcaLoadReportRequest_msginit, |
||||
}; |
||||
|
||||
const upb_MiniTable_File xds_service_orca_v3_orca_proto_upb_file_layout = { |
||||
messages_layout, |
||||
NULL, |
||||
NULL, |
||||
1, |
||||
0, |
||||
0, |
||||
}; |
||||
|
||||
#include "upb/port_undef.inc" |
||||
|
@ -1,109 +0,0 @@ |
||||
/* This file was generated by upbc (the upb compiler) from the input
|
||||
* file: |
||||
* |
||||
* xds/service/orca/v3/orca.proto |
||||
* |
||||
* Do not edit -- your changes will be discarded when the file is |
||||
* regenerated. */ |
||||
|
||||
#ifndef XDS_SERVICE_ORCA_V3_ORCA_PROTO_UPB_H_ |
||||
#define XDS_SERVICE_ORCA_V3_ORCA_PROTO_UPB_H_ |
||||
|
||||
#include "upb/msg_internal.h" |
||||
#include "upb/decode.h" |
||||
#include "upb/decode_fast.h" |
||||
#include "upb/encode.h" |
||||
|
||||
#include "upb/port_def.inc" |
||||
|
||||
#ifdef __cplusplus |
||||
extern "C" { |
||||
#endif |
||||
|
||||
struct xds_service_orca_v3_OrcaLoadReportRequest; |
||||
typedef struct xds_service_orca_v3_OrcaLoadReportRequest xds_service_orca_v3_OrcaLoadReportRequest; |
||||
extern const upb_MiniTable xds_service_orca_v3_OrcaLoadReportRequest_msginit; |
||||
struct google_protobuf_Duration; |
||||
extern const upb_MiniTable google_protobuf_Duration_msginit; |
||||
|
||||
|
||||
|
||||
/* xds.service.orca.v3.OrcaLoadReportRequest */ |
||||
|
||||
UPB_INLINE xds_service_orca_v3_OrcaLoadReportRequest* xds_service_orca_v3_OrcaLoadReportRequest_new(upb_Arena* arena) { |
||||
return (xds_service_orca_v3_OrcaLoadReportRequest*)_upb_Message_New(&xds_service_orca_v3_OrcaLoadReportRequest_msginit, arena); |
||||
} |
||||
UPB_INLINE xds_service_orca_v3_OrcaLoadReportRequest* xds_service_orca_v3_OrcaLoadReportRequest_parse(const char* buf, size_t size, upb_Arena* arena) { |
||||
xds_service_orca_v3_OrcaLoadReportRequest* ret = xds_service_orca_v3_OrcaLoadReportRequest_new(arena); |
||||
if (!ret) return NULL; |
||||
if (upb_Decode(buf, size, ret, &xds_service_orca_v3_OrcaLoadReportRequest_msginit, NULL, 0, arena) != kUpb_DecodeStatus_Ok) { |
||||
return NULL; |
||||
} |
||||
return ret; |
||||
} |
||||
UPB_INLINE xds_service_orca_v3_OrcaLoadReportRequest* xds_service_orca_v3_OrcaLoadReportRequest_parse_ex(const char* buf, size_t size, |
||||
const upb_ExtensionRegistry* extreg, |
||||
int options, upb_Arena* arena) { |
||||
xds_service_orca_v3_OrcaLoadReportRequest* ret = xds_service_orca_v3_OrcaLoadReportRequest_new(arena); |
||||
if (!ret) return NULL; |
||||
if (upb_Decode(buf, size, ret, &xds_service_orca_v3_OrcaLoadReportRequest_msginit, extreg, options, arena) != |
||||
kUpb_DecodeStatus_Ok) { |
||||
return NULL; |
||||
} |
||||
return ret; |
||||
} |
||||
UPB_INLINE char* xds_service_orca_v3_OrcaLoadReportRequest_serialize(const xds_service_orca_v3_OrcaLoadReportRequest* msg, upb_Arena* arena, size_t* len) { |
||||
return upb_Encode(msg, &xds_service_orca_v3_OrcaLoadReportRequest_msginit, 0, arena, len); |
||||
} |
||||
UPB_INLINE char* xds_service_orca_v3_OrcaLoadReportRequest_serialize_ex(const xds_service_orca_v3_OrcaLoadReportRequest* msg, int options, |
||||
upb_Arena* arena, size_t* len) { |
||||
return upb_Encode(msg, &xds_service_orca_v3_OrcaLoadReportRequest_msginit, options, arena, len); |
||||
} |
||||
UPB_INLINE bool xds_service_orca_v3_OrcaLoadReportRequest_has_report_interval(const xds_service_orca_v3_OrcaLoadReportRequest* msg) { |
||||
return _upb_hasbit(msg, 1); |
||||
} |
||||
UPB_INLINE void xds_service_orca_v3_OrcaLoadReportRequest_clear_report_interval(const xds_service_orca_v3_OrcaLoadReportRequest* msg) { |
||||
*UPB_PTR_AT(msg, UPB_SIZE(4, 8), const upb_Message*) = NULL; |
||||
} |
||||
UPB_INLINE const struct google_protobuf_Duration* xds_service_orca_v3_OrcaLoadReportRequest_report_interval(const xds_service_orca_v3_OrcaLoadReportRequest* msg) { |
||||
return *UPB_PTR_AT(msg, UPB_SIZE(4, 8), const struct google_protobuf_Duration*); |
||||
} |
||||
UPB_INLINE void xds_service_orca_v3_OrcaLoadReportRequest_clear_request_cost_names(const xds_service_orca_v3_OrcaLoadReportRequest* msg) { |
||||
_upb_array_detach(msg, UPB_SIZE(8, 16)); |
||||
} |
||||
UPB_INLINE upb_StringView const* xds_service_orca_v3_OrcaLoadReportRequest_request_cost_names(const xds_service_orca_v3_OrcaLoadReportRequest* msg, size_t* len) { |
||||
return (upb_StringView const*)_upb_array_accessor(msg, UPB_SIZE(8, 16), len); |
||||
} |
||||
|
||||
UPB_INLINE void xds_service_orca_v3_OrcaLoadReportRequest_set_report_interval(xds_service_orca_v3_OrcaLoadReportRequest *msg, struct google_protobuf_Duration* value) { |
||||
_upb_sethas(msg, 1); |
||||
*UPB_PTR_AT(msg, UPB_SIZE(4, 8), struct google_protobuf_Duration*) = value; |
||||
} |
||||
UPB_INLINE struct google_protobuf_Duration* xds_service_orca_v3_OrcaLoadReportRequest_mutable_report_interval(xds_service_orca_v3_OrcaLoadReportRequest* msg, upb_Arena* arena) { |
||||
struct google_protobuf_Duration* sub = (struct google_protobuf_Duration*)xds_service_orca_v3_OrcaLoadReportRequest_report_interval(msg); |
||||
if (sub == NULL) { |
||||
sub = (struct google_protobuf_Duration*)_upb_Message_New(&google_protobuf_Duration_msginit, arena); |
||||
if (!sub) return NULL; |
||||
xds_service_orca_v3_OrcaLoadReportRequest_set_report_interval(msg, sub); |
||||
} |
||||
return sub; |
||||
} |
||||
UPB_INLINE upb_StringView* xds_service_orca_v3_OrcaLoadReportRequest_mutable_request_cost_names(xds_service_orca_v3_OrcaLoadReportRequest* msg, size_t* len) { |
||||
return (upb_StringView*)_upb_array_mutable_accessor(msg, UPB_SIZE(8, 16), len); |
||||
} |
||||
UPB_INLINE upb_StringView* xds_service_orca_v3_OrcaLoadReportRequest_resize_request_cost_names(xds_service_orca_v3_OrcaLoadReportRequest* msg, size_t len, upb_Arena* arena) { |
||||
return (upb_StringView*)_upb_Array_Resize_accessor2(msg, UPB_SIZE(8, 16), len, UPB_SIZE(3, 4), arena); |
||||
} |
||||
UPB_INLINE bool xds_service_orca_v3_OrcaLoadReportRequest_add_request_cost_names(xds_service_orca_v3_OrcaLoadReportRequest* msg, upb_StringView val, upb_Arena* arena) { |
||||
return _upb_Array_Append_accessor2(msg, UPB_SIZE(8, 16), UPB_SIZE(3, 4), &val, arena); |
||||
} |
||||
|
||||
extern const upb_MiniTable_File xds_service_orca_v3_orca_proto_upb_file_layout; |
||||
|
||||
#ifdef __cplusplus |
||||
} /* extern "C" */ |
||||
#endif |
||||
|
||||
#include "upb/port_undef.inc" |
||||
|
||||
#endif /* XDS_SERVICE_ORCA_V3_ORCA_PROTO_UPB_H_ */ |
@ -1,224 +0,0 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include "google/protobuf/duration.upb.h" |
||||
#include "upb/upb.hpp" |
||||
#include "xds/data/orca/v3/orca_load_report.upb.h" |
||||
#include "xds/service/orca/v3/orca.upb.h" |
||||
|
||||
#include <grpcpp/ext/orca_service.h> |
||||
#include <grpcpp/impl/codegen/server_callback_handlers.h> |
||||
|
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
|
||||
namespace grpc { |
||||
namespace experimental { |
||||
|
||||
//
|
||||
// OrcaService::Reactor
|
||||
//
|
||||
|
||||
class OrcaService::Reactor : public ServerWriteReactor<ByteBuffer>, |
||||
public grpc_core::RefCounted<Reactor> { |
||||
public: |
||||
explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer) |
||||
: service_(service) { |
||||
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr); |
||||
// Get slice from request.
|
||||
Slice slice; |
||||
request_buffer->DumpToSingleSlice(&slice); |
||||
// Parse request proto.
|
||||
upb::Arena arena; |
||||
xds_service_orca_v3_OrcaLoadReportRequest* request = |
||||
xds_service_orca_v3_OrcaLoadReportRequest_parse( |
||||
reinterpret_cast<const char*>(slice.begin()), slice.size(), |
||||
arena.ptr()); |
||||
if (request == nullptr) { |
||||
Finish(Status(StatusCode::INTERNAL, "could not parse request proto")); |
||||
return; |
||||
} |
||||
const auto* duration_proto = |
||||
xds_service_orca_v3_OrcaLoadReportRequest_report_interval(request); |
||||
if (duration_proto != nullptr) { |
||||
report_interval_ = grpc_core::Duration::FromSecondsAndNanoseconds( |
||||
google_protobuf_Duration_seconds(duration_proto), |
||||
google_protobuf_Duration_nanos(duration_proto)); |
||||
} |
||||
auto min_interval = grpc_core::Duration::Milliseconds( |
||||
service_->min_report_duration_ / absl::Milliseconds(1)); |
||||
if (report_interval_ < min_interval) report_interval_ = min_interval; |
||||
// Send initial response.
|
||||
SendResponse(); |
||||
} |
||||
|
||||
void OnWriteDone(bool ok) override { |
||||
if (!ok) { |
||||
Finish(Status(StatusCode::UNKNOWN, "write failed")); |
||||
return; |
||||
} |
||||
response_.Clear(); |
||||
ScheduleTimer(); |
||||
} |
||||
|
||||
void OnCancel() override { |
||||
MaybeCancelTimer(); |
||||
Finish(Status(StatusCode::UNKNOWN, "call cancelled by client")); |
||||
} |
||||
|
||||
void OnDone() override { |
||||
// Free the initial ref from instantiation.
|
||||
Unref(); |
||||
} |
||||
|
||||
private: |
||||
void SendResponse() { |
||||
Slice response_slice = service_->GetOrCreateSerializedResponse(); |
||||
ByteBuffer response_buffer(&response_slice, 1); |
||||
response_.Swap(&response_buffer); |
||||
StartWrite(&response_); |
||||
} |
||||
|
||||
void ScheduleTimer() { |
||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
Ref().release(); // Ref held by timer.
|
||||
grpc::internal::MutexLock lock(&timer_mu_); |
||||
timer_pending_ = true; |
||||
grpc_timer_init(&timer_, exec_ctx.Now() + report_interval_, &on_timer_); |
||||
} |
||||
|
||||
void MaybeCancelTimer() { |
||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc::internal::MutexLock lock(&timer_mu_); |
||||
if (timer_pending_) { |
||||
timer_pending_ = false; |
||||
grpc_timer_cancel(&timer_); |
||||
} |
||||
} |
||||
|
||||
static void OnTimer(void* arg, grpc_error_handle error) { |
||||
grpc_core::RefCountedPtr<Reactor> self(static_cast<Reactor*>(arg)); |
||||
grpc::internal::MutexLock lock(&self->timer_mu_); |
||||
if (error == GRPC_ERROR_NONE && self->timer_pending_) { |
||||
self->timer_pending_ = false; |
||||
self->SendResponse(); |
||||
} |
||||
} |
||||
|
||||
OrcaService* service_; |
||||
|
||||
// TODO(roth): Change this to use the EventEngine API once it becomes
|
||||
// available.
|
||||
grpc::internal::Mutex timer_mu_; |
||||
bool timer_pending_ ABSL_GUARDED_BY(&timer_mu_) = false; |
||||
grpc_timer timer_ ABSL_GUARDED_BY(&timer_mu_); |
||||
grpc_closure on_timer_; |
||||
|
||||
grpc_core::Duration report_interval_; |
||||
ByteBuffer response_; |
||||
}; |
||||
|
||||
//
|
||||
// OrcaService
|
||||
//
|
||||
|
||||
OrcaService::OrcaService(OrcaService::Options options) |
||||
: min_report_duration_(options.min_report_duration) { |
||||
AddMethod(new internal::RpcServiceMethod( |
||||
"/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics", |
||||
internal::RpcMethod::SERVER_STREAMING, /*handler=*/nullptr)); |
||||
MarkMethodCallback( |
||||
0, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>( |
||||
[this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) { |
||||
return new Reactor(this, request); |
||||
})); |
||||
} |
||||
|
||||
void OrcaService::SetCpuUtilization(double cpu_utilization) { |
||||
grpc::internal::MutexLock lock(&mu_); |
||||
cpu_utilization_ = cpu_utilization; |
||||
response_slice_.reset(); |
||||
} |
||||
|
||||
void OrcaService::DeleteCpuUtilization() { |
||||
grpc::internal::MutexLock lock(&mu_); |
||||
cpu_utilization_ = -1; |
||||
response_slice_.reset(); |
||||
} |
||||
|
||||
void OrcaService::SetMemoryUtilization(double memory_utilization) { |
||||
grpc::internal::MutexLock lock(&mu_); |
||||
memory_utilization_ = memory_utilization; |
||||
response_slice_.reset(); |
||||
} |
||||
|
||||
void OrcaService::DeleteMemoryUtilization() { |
||||
grpc::internal::MutexLock lock(&mu_); |
||||
memory_utilization_ = -1; |
||||
response_slice_.reset(); |
||||
} |
||||
|
||||
void OrcaService::SetNamedUtilization(std::string name, double utilization) { |
||||
grpc::internal::MutexLock lock(&mu_); |
||||
named_utilization_[std::move(name)] = utilization; |
||||
response_slice_.reset(); |
||||
} |
||||
|
||||
void OrcaService::DeleteNamedUtilization(const std::string& name) { |
||||
grpc::internal::MutexLock lock(&mu_); |
||||
named_utilization_.erase(name); |
||||
response_slice_.reset(); |
||||
} |
||||
|
||||
void OrcaService::SetAllNamedUtilization( |
||||
std::map<std::string, double> named_utilization) { |
||||
grpc::internal::MutexLock lock(&mu_); |
||||
named_utilization_ = std::move(named_utilization); |
||||
response_slice_.reset(); |
||||
} |
||||
|
||||
Slice OrcaService::GetOrCreateSerializedResponse() { |
||||
grpc::internal::MutexLock lock(&mu_); |
||||
if (!response_slice_.has_value()) { |
||||
upb::Arena arena; |
||||
xds_data_orca_v3_OrcaLoadReport* response = |
||||
xds_data_orca_v3_OrcaLoadReport_new(arena.ptr()); |
||||
if (cpu_utilization_ != -1) { |
||||
xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization(response, |
||||
cpu_utilization_); |
||||
} |
||||
if (memory_utilization_ != -1) { |
||||
xds_data_orca_v3_OrcaLoadReport_set_mem_utilization(response, |
||||
memory_utilization_); |
||||
} |
||||
for (const auto& p : named_utilization_) { |
||||
xds_data_orca_v3_OrcaLoadReport_utilization_set( |
||||
response, |
||||
upb_StringView_FromDataAndSize(p.first.data(), p.first.size()), |
||||
p.second, arena.ptr()); |
||||
} |
||||
size_t buf_length; |
||||
char* buf = xds_data_orca_v3_OrcaLoadReport_serialize(response, arena.ptr(), |
||||
&buf_length); |
||||
response_slice_.emplace(buf, buf_length); |
||||
} |
||||
return Slice(*response_slice_); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc
|
@ -1,47 +0,0 @@ |
||||
// Copyright 2022 gRPC authors. |
||||
// |
||||
// Licensed under the Apache License, Version 2.0 (the "License"); |
||||
// you may not use this file except in compliance with the License. |
||||
// You may obtain a copy of the License at |
||||
// |
||||
// http://www.apache.org/licenses/LICENSE-2.0 |
||||
// |
||||
// Unless required by applicable law or agreed to in writing, software |
||||
// distributed under the License is distributed on an "AS IS" BASIS, |
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
// See the License for the specific language governing permissions and |
||||
// limitations under the License. |
||||
|
||||
// Local copy of Envoy xDS proto file, used for testing only. |
||||
|
||||
syntax = "proto3"; |
||||
|
||||
package xds.service.orca.v3; |
||||
|
||||
import "src/proto/grpc/testing/xds/v3/orca_load_report.proto"; |
||||
|
||||
import "google/protobuf/duration.proto"; |
||||
|
||||
// See section `Out-of-band (OOB) reporting` of the design document in |
||||
// :ref:`https://github.com/envoyproxy/envoy/issues/6614`. |
||||
|
||||
// Out-of-band (OOB) load reporting service for the additional load reporting |
||||
// agent that does not sit in the request path. Reports are periodically sampled |
||||
// with sufficient frequency to provide temporal association with requests. |
||||
// OOB reporting compensates the limitation of in-band reporting in revealing |
||||
// costs for backends that do not provide a steady stream of telemetry such as |
||||
// long running stream operations and zero QPS services. This is a server |
||||
// streaming service, client needs to terminate current RPC and initiate |
||||
// a new call to change backend reporting frequency. |
||||
service OpenRcaService { |
||||
rpc StreamCoreMetrics(OrcaLoadReportRequest) returns (stream xds.data.orca.v3.OrcaLoadReport); |
||||
} |
||||
|
||||
message OrcaLoadReportRequest { |
||||
// Interval for generating Open RCA core metric responses. |
||||
google.protobuf.Duration report_interval = 1; |
||||
// Request costs to collect. If this is empty, all known requests costs tracked by |
||||
// the load reporting agent will be returned. This provides an opportunity for |
||||
// the client to selectively obtain a subset of tracked costs. |
||||
repeated string request_cost_names = 2; |
||||
} |
@ -1,200 +0,0 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <gmock/gmock.h> |
||||
#include <gtest/gtest.h> |
||||
|
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpcpp/channel.h> |
||||
#include <grpcpp/client_context.h> |
||||
#include <grpcpp/create_channel.h> |
||||
#include <grpcpp/ext/orca_service.h> |
||||
#include <grpcpp/server.h> |
||||
#include <grpcpp/server_builder.h> |
||||
#include <grpcpp/server_context.h> |
||||
|
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/proto/grpc/testing/xds/v3/orca_service.grpc.pb.h" |
||||
#include "src/proto/grpc/testing/xds/v3/orca_service.pb.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
using xds::data::orca::v3::OrcaLoadReport; |
||||
using xds::service::orca::v3::OpenRcaService; |
||||
using xds::service::orca::v3::OrcaLoadReportRequest; |
||||
|
||||
namespace grpc { |
||||
namespace testing { |
||||
namespace { |
||||
|
||||
using experimental::OrcaService; |
||||
|
||||
class OrcaServiceEnd2endTest : public ::testing::Test { |
||||
protected: |
||||
// A wrapper for the client stream that ensures that responses come
|
||||
// back at the requested interval.
|
||||
class Stream { |
||||
public: |
||||
Stream(OpenRcaService::Stub* stub, grpc_core::Duration requested_interval) |
||||
: requested_interval_(requested_interval) { |
||||
OrcaLoadReportRequest request; |
||||
gpr_timespec timespec = requested_interval.as_timespec(); |
||||
auto* interval_proto = request.mutable_report_interval(); |
||||
interval_proto->set_seconds(timespec.tv_sec); |
||||
interval_proto->set_nanos(timespec.tv_nsec); |
||||
stream_ = stub->StreamCoreMetrics(&context_, request); |
||||
} |
||||
|
||||
~Stream() { context_.TryCancel(); } |
||||
|
||||
OrcaLoadReport ReadResponse() { |
||||
OrcaLoadReport response; |
||||
EXPECT_TRUE(stream_->Read(&response)); |
||||
auto now = grpc_core::Timestamp::FromTimespecRoundDown( |
||||
gpr_now(GPR_CLOCK_MONOTONIC)); |
||||
if (last_response_time_.has_value()) { |
||||
// Allow a small fudge factor to avoid test flakiness.
|
||||
const grpc_core::Duration fudge_factor = |
||||
grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(); |
||||
auto elapsed = now - *last_response_time_; |
||||
EXPECT_GE(elapsed, requested_interval_ - fudge_factor) |
||||
<< elapsed.ToString(); |
||||
EXPECT_LE(elapsed, requested_interval_ + fudge_factor) |
||||
<< elapsed.ToString(); |
||||
} |
||||
last_response_time_ = now; |
||||
return response; |
||||
} |
||||
|
||||
private: |
||||
const grpc_core::Duration requested_interval_; |
||||
ClientContext context_; |
||||
std::unique_ptr<grpc::ClientReaderInterface<OrcaLoadReport>> stream_; |
||||
absl::optional<grpc_core::Timestamp> last_response_time_; |
||||
}; |
||||
|
||||
OrcaServiceEnd2endTest() |
||||
: orca_service_(OrcaService::Options().set_min_report_duration( |
||||
absl::ZeroDuration())) { |
||||
std::string server_address = |
||||
absl::StrCat("localhost:", grpc_pick_unused_port_or_die()); |
||||
ServerBuilder builder; |
||||
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); |
||||
builder.RegisterService(&orca_service_); |
||||
server_ = builder.BuildAndStart(); |
||||
gpr_log(GPR_INFO, "server started on %s", server_address_.c_str()); |
||||
auto channel = CreateChannel(server_address, InsecureChannelCredentials()); |
||||
stub_ = OpenRcaService::NewStub(channel); |
||||
} |
||||
|
||||
~OrcaServiceEnd2endTest() override { server_->Shutdown(); } |
||||
|
||||
std::string server_address_; |
||||
OrcaService orca_service_; |
||||
std::unique_ptr<Server> server_; |
||||
std::unique_ptr<OpenRcaService::Stub> stub_; |
||||
}; |
||||
|
||||
TEST_F(OrcaServiceEnd2endTest, Basic) { |
||||
constexpr char kMetricName1[] = "foo"; |
||||
constexpr char kMetricName2[] = "bar"; |
||||
constexpr char kMetricName3[] = "baz"; |
||||
constexpr char kMetricName4[] = "quux"; |
||||
// Start stream1 with 5s interval and stream2 with 2.5s interval.
|
||||
// Throughout the test, we should get two responses on stream2 for
|
||||
// every one response on stream1.
|
||||
Stream stream1(stub_.get(), grpc_core::Duration::Milliseconds(5000)); |
||||
Stream stream2(stub_.get(), grpc_core::Duration::Milliseconds(2500)); |
||||
auto ReadResponses = [&](std::function<void(const OrcaLoadReport&)> checker) { |
||||
gpr_log(GPR_INFO, "reading response from stream1"); |
||||
OrcaLoadReport response = stream1.ReadResponse(); |
||||
checker(response); |
||||
gpr_log(GPR_INFO, "reading response from stream2"); |
||||
response = stream2.ReadResponse(); |
||||
checker(response); |
||||
gpr_log(GPR_INFO, "reading response from stream2"); |
||||
response = stream2.ReadResponse(); |
||||
checker(response); |
||||
}; |
||||
// Initial response should not have any values populated.
|
||||
ReadResponses([](const OrcaLoadReport& response) { |
||||
EXPECT_EQ(response.cpu_utilization(), 0); |
||||
EXPECT_EQ(response.mem_utilization(), 0); |
||||
EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre()); |
||||
}); |
||||
// Now set CPU utilization on the server.
|
||||
orca_service_.SetCpuUtilization(0.5); |
||||
ReadResponses([](const OrcaLoadReport& response) { |
||||
EXPECT_EQ(response.cpu_utilization(), 0.5); |
||||
EXPECT_EQ(response.mem_utilization(), 0); |
||||
EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre()); |
||||
}); |
||||
// Update CPU utilization and set memory utilization.
|
||||
orca_service_.SetCpuUtilization(0.8); |
||||
orca_service_.SetMemoryUtilization(0.4); |
||||
ReadResponses([](const OrcaLoadReport& response) { |
||||
EXPECT_EQ(response.cpu_utilization(), 0.8); |
||||
EXPECT_EQ(response.mem_utilization(), 0.4); |
||||
EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre()); |
||||
}); |
||||
// Unset CPU and memory utilization and set a named utilization.
|
||||
orca_service_.DeleteCpuUtilization(); |
||||
orca_service_.DeleteMemoryUtilization(); |
||||
orca_service_.SetNamedUtilization(kMetricName1, 0.3); |
||||
ReadResponses([&](const OrcaLoadReport& response) { |
||||
EXPECT_EQ(response.cpu_utilization(), 0); |
||||
EXPECT_EQ(response.mem_utilization(), 0); |
||||
EXPECT_THAT( |
||||
response.utilization(), |
||||
::testing::UnorderedElementsAre(::testing::Pair(kMetricName1, 0.3))); |
||||
}); |
||||
// Unset the previous named utilization and set two new ones.
|
||||
orca_service_.DeleteNamedUtilization(kMetricName1); |
||||
orca_service_.SetNamedUtilization(kMetricName2, 0.2); |
||||
orca_service_.SetNamedUtilization(kMetricName3, 0.1); |
||||
ReadResponses([&](const OrcaLoadReport& response) { |
||||
EXPECT_EQ(response.cpu_utilization(), 0); |
||||
EXPECT_EQ(response.mem_utilization(), 0); |
||||
EXPECT_THAT( |
||||
response.utilization(), |
||||
::testing::UnorderedElementsAre(::testing::Pair(kMetricName2, 0.2), |
||||
::testing::Pair(kMetricName3, 0.1))); |
||||
}); |
||||
// Replace the entire named metric map at once.
|
||||
orca_service_.SetAllNamedUtilization( |
||||
{{kMetricName2, 0.5}, {kMetricName4, 0.9}}); |
||||
ReadResponses([&](const OrcaLoadReport& response) { |
||||
EXPECT_EQ(response.cpu_utilization(), 0); |
||||
EXPECT_EQ(response.mem_utilization(), 0); |
||||
EXPECT_THAT( |
||||
response.utilization(), |
||||
::testing::UnorderedElementsAre(::testing::Pair(kMetricName2, 0.5), |
||||
::testing::Pair(kMetricName4, 0.9))); |
||||
}); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace testing
|
||||
} // namespace grpc
|
||||
|
||||
int main(int argc, char** argv) { |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue