mirror of https://github.com/grpc/grpc.git
server: introduce ServerMetricRecorder API and move per-call reporting from a C++ interceptor to a C-core filter (#32106)
* backend metric sampling * Comments addressed. * More comments addressed. * Pushing changes left behind locally. * Removed empty lines * Update OrcaService to use ServerMetricRecorder (no named metrics yet) * Comments addressed. * More comments addressed * More comments addressed. * Comments fixed * Comments addressed. * Test fixed * make seq returned always up-to-date * skip atomic load when not cached * Fixed ABSL_GUARDED_BY * Comments addressed except client_lb_end2end_test * test updated * Comments addressed * BUILD fix. * BackendMetricDataState moved to a separate header * comments addressed * Fixed clang and buildifier errors * More sanity check errors fixed. * Fixed xds tests * Ran generate_projects.sh * Comments addressed * comments addressed. * generate project * Build fixed * generate project * sanity check errors fixed * test fixed * Backup poller period override moved to main() * Also move cfstream override * Clang fixes, sanitize * generate_projects.sh * portable print format fix * Removed outdated commentpull/32272/head
parent
95b2f0c8ca
commit
c7f641da0d
43 changed files with 1198 additions and 634 deletions
@ -0,0 +1,108 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#ifndef GRPCPP_EXT_SERVER_METRIC_RECORDER_H |
||||
#define GRPCPP_EXT_SERVER_METRIC_RECORDER_H |
||||
|
||||
#include <functional> |
||||
#include <map> |
||||
#include <memory> |
||||
|
||||
#include <grpcpp/impl/sync.h> |
||||
#include <grpcpp/support/string_ref.h> |
||||
|
||||
namespace grpc_core { |
||||
struct BackendMetricData; |
||||
} // namespace grpc_core
|
||||
|
||||
namespace grpc { |
||||
class BackendMetricState; |
||||
|
||||
namespace experimental { |
||||
/// Records server wide metrics to be reported to the client.
|
||||
/// Server implementation creates an instance and reports server metrics to it,
|
||||
/// and then passes it to
|
||||
/// ServerBuilder::experimental_type::EnableCallMetricRecording or
|
||||
/// experimental::OrcaService that read metrics to include in the report.
|
||||
class ServerMetricRecorder { |
||||
public: |
||||
// Factory method. Use this to create.
|
||||
static std::unique_ptr<ServerMetricRecorder> Create(); |
||||
/// Records the server CPU utilization in the range [0, 1].
|
||||
/// Values outside of the valid range are rejected.
|
||||
/// Overrides the stored value when called again with a valid value.
|
||||
void SetCpuUtilization(double value); |
||||
/// Records the server memory utilization in the range [0, 1].
|
||||
/// Values outside of the valid range are rejected.
|
||||
/// Overrides the stored value when called again with a valid value.
|
||||
void SetMemoryUtilization(double value); |
||||
/// Records number of queries per second to the server in the range [0, infy).
|
||||
/// Values outside of the valid range are rejected.
|
||||
/// Overrides the stored value when called again with a valid value.
|
||||
void SetQps(double value); |
||||
/// Records a named resource utilization value in the range [0, 1].
|
||||
/// Values outside of the valid range are rejected.
|
||||
/// Overrides the stored value when called again with the same name.
|
||||
/// The name string should remain valid while this utilization remains
|
||||
/// in this recorder. It is assumed that strings are common names that are
|
||||
/// global constants.
|
||||
void SetNamedUtilization(string_ref name, double value); |
||||
/// Replaces all named resource utilization values. No range validation.
|
||||
/// The name strings should remain valid while utilization values remain
|
||||
/// in this recorder. It is assumed that strings are common names that are
|
||||
/// global constants.
|
||||
void SetAllNamedUtilization(std::map<string_ref, double> named_utilization); |
||||
|
||||
/// Clears the server CPU utilization if recorded.
|
||||
void ClearCpuUtilization(); |
||||
/// Clears the server memory utilization if recorded.
|
||||
void ClearMemoryUtilization(); |
||||
/// Clears number of queries per second to the server if recorded.
|
||||
void ClearQps(); |
||||
/// Clears a named utilization value if exists.
|
||||
void ClearNamedUtilization(string_ref name); |
||||
|
||||
private: |
||||
// To access GetMetrics().
|
||||
friend class grpc::BackendMetricState; |
||||
friend class OrcaService; |
||||
|
||||
struct BackendMetricDataState; |
||||
|
||||
// No direct creation, use the factory method Create() above.
|
||||
ServerMetricRecorder(); |
||||
|
||||
// Updates the metric state by applying `updater` to the data and incrementing
|
||||
// the sequence number.
|
||||
void UpdateBackendMetricDataState( |
||||
std::function<void(grpc_core::BackendMetricData*)> updater); |
||||
|
||||
grpc_core::BackendMetricData GetMetrics() const; |
||||
// Returned metric data is guaranteed to be identical between two calls if the
|
||||
// sequence numbers match.
|
||||
std::shared_ptr<const BackendMetricDataState> GetMetricsIfChanged() const; |
||||
|
||||
mutable grpc::internal::Mutex mu_; |
||||
std::shared_ptr<const BackendMetricDataState> metric_state_ |
||||
ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPCPP_EXT_SERVER_METRIC_RECORDER_H
|
@ -0,0 +1,148 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/backend_metrics/backend_metric_filter.h" |
||||
|
||||
#include <inttypes.h> |
||||
#include <limits.h> |
||||
#include <stddef.h> |
||||
|
||||
#include <functional> |
||||
#include <map> |
||||
#include <memory> |
||||
#include <utility> |
||||
|
||||
#include "absl/strings/string_view.h" |
||||
#include "upb/upb.h" |
||||
#include "upb/upb.hpp" |
||||
#include "xds/data/orca/v3/orca_load_report.upb.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" |
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/channel/channel_stack_builder.h" |
||||
#include "src/core/lib/channel/context.h" |
||||
#include "src/core/lib/config/core_configuration.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/promise/map.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/surface/channel_stack_type.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
TraceFlag grpc_backend_metric_filter_trace(false, "backend_metric_filter"); |
||||
|
||||
absl::optional<std::string> BackendMetricFilter::MaybeSerializeBackendMetrics( |
||||
BackendMetricProvider* provider) const { |
||||
if (provider == nullptr) return absl::nullopt; |
||||
BackendMetricData data = provider->GetBackendMetricData(); |
||||
upb::Arena arena; |
||||
xds_data_orca_v3_OrcaLoadReport* response = |
||||
xds_data_orca_v3_OrcaLoadReport_new(arena.ptr()); |
||||
bool has_data = false; |
||||
if (data.cpu_utilization != -1) { |
||||
xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization(response, |
||||
data.cpu_utilization); |
||||
has_data = true; |
||||
} |
||||
if (data.mem_utilization != -1) { |
||||
xds_data_orca_v3_OrcaLoadReport_set_mem_utilization(response, |
||||
data.mem_utilization); |
||||
has_data = true; |
||||
} |
||||
if (data.qps != -1) { |
||||
xds_data_orca_v3_OrcaLoadReport_set_rps_fractional(response, data.qps); |
||||
has_data = true; |
||||
} |
||||
for (const auto& p : data.request_cost) { |
||||
xds_data_orca_v3_OrcaLoadReport_request_cost_set( |
||||
response, |
||||
upb_StringView_FromDataAndSize(p.first.data(), p.first.size()), |
||||
p.second, arena.ptr()); |
||||
has_data = true; |
||||
} |
||||
for (const auto& p : data.utilization) { |
||||
xds_data_orca_v3_OrcaLoadReport_utilization_set( |
||||
response, |
||||
upb_StringView_FromDataAndSize(p.first.data(), p.first.size()), |
||||
p.second, arena.ptr()); |
||||
has_data = true; |
||||
} |
||||
if (!has_data) { |
||||
return absl::nullopt; |
||||
} |
||||
size_t len; |
||||
char* buf = |
||||
xds_data_orca_v3_OrcaLoadReport_serialize(response, arena.ptr(), &len); |
||||
return std::string(buf, len); |
||||
} |
||||
|
||||
const grpc_channel_filter BackendMetricFilter::kFilter = |
||||
MakePromiseBasedFilter<BackendMetricFilter, FilterEndpoint::kServer>( |
||||
"backend_metric"); |
||||
|
||||
absl::StatusOr<BackendMetricFilter> BackendMetricFilter::Create( |
||||
const ChannelArgs&, ChannelFilter::Args) { |
||||
return BackendMetricFilter(); |
||||
} |
||||
|
||||
ArenaPromise<ServerMetadataHandle> BackendMetricFilter::MakeCallPromise( |
||||
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
||||
return ArenaPromise<ServerMetadataHandle>(Map( |
||||
next_promise_factory(std::move(call_args)), |
||||
[this](ServerMetadataHandle trailing_metadata) { |
||||
auto* ctx = &GetContext< |
||||
grpc_call_context_element>()[GRPC_CONTEXT_BACKEND_METRIC_PROVIDER]; |
||||
if (ctx == nullptr) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_filter_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] No BackendMetricProvider.", this); |
||||
} |
||||
return trailing_metadata; |
||||
} |
||||
absl::optional<std::string> serialized = MaybeSerializeBackendMetrics( |
||||
reinterpret_cast<BackendMetricProvider*>(ctx->value)); |
||||
if (serialized.has_value() && !serialized->empty()) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_filter_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[%p] Backend metrics serialized. size: %" PRIuPTR, this, |
||||
serialized->size()); |
||||
} |
||||
trailing_metadata->Set( |
||||
EndpointLoadMetricsBinMetadata(), |
||||
Slice::FromCopiedString(std::move(*serialized))); |
||||
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_filter_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] No backend metrics.", this); |
||||
} |
||||
return trailing_metadata; |
||||
})); |
||||
} |
||||
|
||||
void RegisterBackendMetricFilter(CoreConfiguration::Builder* builder) { |
||||
builder->channel_init()->RegisterStage( |
||||
GRPC_SERVER_CHANNEL, INT_MAX, [](ChannelStackBuilder* builder) { |
||||
if (builder->channel_args().Contains( |
||||
GRPC_ARG_SERVER_CALL_METRIC_RECORDING)) { |
||||
builder->PrependFilter(&BackendMetricFilter::kFilter); |
||||
} |
||||
return true; |
||||
}); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,56 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2023 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_BACKEND_METRICS_BACKEND_METRIC_FILTER_H |
||||
#define GRPC_SRC_CORE_EXT_FILTERS_BACKEND_METRICS_BACKEND_METRIC_FILTER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <string> |
||||
|
||||
#include "absl/status/statusor.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include "src/core/ext/filters/backend_metrics/backend_metric_provider.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_fwd.h" |
||||
#include "src/core/lib/channel/promise_based_filter.h" |
||||
#include "src/core/lib/promise/arena_promise.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class BackendMetricFilter : public ChannelFilter { |
||||
public: |
||||
static const grpc_channel_filter kFilter; |
||||
|
||||
static absl::StatusOr<BackendMetricFilter> Create(const ChannelArgs& args, |
||||
ChannelFilter::Args); |
||||
|
||||
// Construct a promise for one call.
|
||||
ArenaPromise<ServerMetadataHandle> MakeCallPromise( |
||||
CallArgs call_args, NextPromiseFactory next_promise_factory) override; |
||||
|
||||
private: |
||||
absl::optional<std::string> MaybeSerializeBackendMetrics( |
||||
BackendMetricProvider* provider) const; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_BACKEND_METRICS_BACKEND_METRIC_FILTER_H
|
@ -0,0 +1,33 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2023 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_BACKEND_METRICS_BACKEND_METRIC_PROVIDER_H |
||||
#define GRPC_SRC_CORE_EXT_FILTERS_BACKEND_METRICS_BACKEND_METRIC_PROVIDER_H |
||||
|
||||
namespace grpc_core { |
||||
|
||||
struct BackendMetricData; |
||||
class BackendMetricProvider { |
||||
public: |
||||
virtual ~BackendMetricProvider() = default; |
||||
virtual BackendMetricData GetBackendMetricData() = 0; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_BACKEND_METRICS_BACKEND_METRIC_PROVIDER_H
|
@ -0,0 +1,311 @@ |
||||
//
|
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include "src/cpp/server/backend_metric_recorder.h" |
||||
|
||||
#include <inttypes.h> |
||||
|
||||
#include <functional> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <type_traits> |
||||
#include <utility> |
||||
|
||||
#include <grpc/support/log.h> |
||||
#include <grpcpp/ext/call_metric_recorder.h> |
||||
#include <grpcpp/ext/server_metric_recorder.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
|
||||
using grpc_core::BackendMetricData; |
||||
|
||||
namespace { |
||||
// All utilization values must be in [0, 1].
|
||||
bool IsUtilizationValid(double utilization) { |
||||
return utilization >= 0.0 && utilization <= 1.0; |
||||
} |
||||
|
||||
// QPS must be in [0, infy).
|
||||
bool IsQpsValid(double qps) { return qps >= 0.0; } |
||||
|
||||
grpc_core::TraceFlag grpc_backend_metric_trace(false, "backend_metric"); |
||||
} // namespace
|
||||
|
||||
namespace grpc { |
||||
namespace experimental { |
||||
|
||||
std::unique_ptr<ServerMetricRecorder> ServerMetricRecorder::Create() { |
||||
return std::unique_ptr<ServerMetricRecorder>(new ServerMetricRecorder()); |
||||
} |
||||
|
||||
ServerMetricRecorder::ServerMetricRecorder() |
||||
: metric_state_(std::make_shared<const BackendMetricDataState>()) {} |
||||
|
||||
void ServerMetricRecorder::UpdateBackendMetricDataState( |
||||
std::function<void(BackendMetricData*)> updater) { |
||||
internal::MutexLock lock(&mu_); |
||||
auto new_state = std::make_shared<BackendMetricDataState>(*metric_state_); |
||||
updater(&new_state->data); |
||||
++new_state->sequence_number; |
||||
metric_state_ = std::move(new_state); |
||||
} |
||||
|
||||
void ServerMetricRecorder::SetCpuUtilization(double value) { |
||||
if (!IsUtilizationValid(value)) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] CPU utilization rejected: %f", this, value); |
||||
} |
||||
return; |
||||
} |
||||
UpdateBackendMetricDataState( |
||||
[value](BackendMetricData* data) { data->cpu_utilization = value; }); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] CPU utilization set: %f", this, value); |
||||
} |
||||
} |
||||
|
||||
void ServerMetricRecorder::SetMemoryUtilization(double value) { |
||||
if (!IsUtilizationValid(value)) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] Mem utilization rejected: %f", this, value); |
||||
} |
||||
return; |
||||
} |
||||
UpdateBackendMetricDataState( |
||||
[value](BackendMetricData* data) { data->mem_utilization = value; }); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] Mem utilization set: %f", this, value); |
||||
} |
||||
} |
||||
|
||||
void ServerMetricRecorder::SetQps(double value) { |
||||
if (!IsQpsValid(value)) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] QPS rejected: %f", this, value); |
||||
} |
||||
return; |
||||
} |
||||
UpdateBackendMetricDataState( |
||||
[value](BackendMetricData* data) { data->qps = value; }); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] QPS set: %f", this, value); |
||||
} |
||||
} |
||||
|
||||
void ServerMetricRecorder::SetNamedUtilization(string_ref name, double value) { |
||||
if (!IsUtilizationValid(value)) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] Named utilization rejected: %f name: %s", this, |
||||
value, std::string(name.data(), name.size()).c_str()); |
||||
} |
||||
return; |
||||
} |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] Named utilization set: %f name: %s", this, value, |
||||
std::string(name.data(), name.size()).c_str()); |
||||
} |
||||
UpdateBackendMetricDataState([name, value](BackendMetricData* data) { |
||||
data->utilization[absl::string_view(name.data(), name.size())] = value; |
||||
}); |
||||
} |
||||
|
||||
void ServerMetricRecorder::SetAllNamedUtilization( |
||||
std::map<string_ref, double> named_utilization) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] All named utilization updated. size: %" PRIuPTR, |
||||
this, named_utilization.size()); |
||||
} |
||||
UpdateBackendMetricDataState( |
||||
[utilization = std::move(named_utilization)](BackendMetricData* data) { |
||||
data->utilization.clear(); |
||||
for (const auto& u : utilization) { |
||||
data->utilization[absl::string_view(u.first.data(), u.first.size())] = |
||||
u.second; |
||||
} |
||||
}); |
||||
} |
||||
|
||||
void ServerMetricRecorder::ClearCpuUtilization() { |
||||
UpdateBackendMetricDataState( |
||||
[](BackendMetricData* data) { data->cpu_utilization = -1; }); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] CPU utilization cleared.", this); |
||||
} |
||||
} |
||||
|
||||
void ServerMetricRecorder::ClearMemoryUtilization() { |
||||
UpdateBackendMetricDataState( |
||||
[](BackendMetricData* data) { data->mem_utilization = -1; }); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] Mem utilization cleared.", this); |
||||
} |
||||
} |
||||
|
||||
void ServerMetricRecorder::ClearQps() { |
||||
UpdateBackendMetricDataState([](BackendMetricData* data) { data->qps = -1; }); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] QPS utilization cleared.", this); |
||||
} |
||||
} |
||||
|
||||
void ServerMetricRecorder::ClearNamedUtilization(string_ref name) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] Named utilization cleared. name: %s", this, |
||||
std::string(name.data(), name.size()).c_str()); |
||||
} |
||||
UpdateBackendMetricDataState([name](BackendMetricData* data) { |
||||
data->utilization.erase(absl::string_view(name.data(), name.size())); |
||||
}); |
||||
} |
||||
|
||||
grpc_core::BackendMetricData ServerMetricRecorder::GetMetrics() const { |
||||
auto result = GetMetricsIfChanged(); |
||||
return result->data; |
||||
} |
||||
|
||||
std::shared_ptr<const ServerMetricRecorder::BackendMetricDataState> |
||||
ServerMetricRecorder::GetMetricsIfChanged() const { |
||||
std::shared_ptr<const BackendMetricDataState> result; |
||||
{ |
||||
internal::MutexLock lock(&mu_); |
||||
result = metric_state_; |
||||
} |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
const auto& data = result->data; |
||||
gpr_log(GPR_INFO, |
||||
"[%p] GetMetrics() returned: seq:%" PRIu64 |
||||
" cpu:%f mem:%f qps:%f utilization size: %" PRIuPTR, |
||||
this, result->sequence_number, data.cpu_utilization, |
||||
data.mem_utilization, data.qps, data.utilization.size()); |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
|
||||
experimental::CallMetricRecorder& |
||||
BackendMetricState::RecordCpuUtilizationMetric(double value) { |
||||
if (!IsUtilizationValid(value)) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] CPU utilization value rejected: %f", this, value); |
||||
} |
||||
return *this; |
||||
} |
||||
cpu_utilization_.store(value, std::memory_order_relaxed); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] CPU utilization recorded: %f", this, value); |
||||
} |
||||
return *this; |
||||
} |
||||
|
||||
experimental::CallMetricRecorder& |
||||
BackendMetricState::RecordMemoryUtilizationMetric(double value) { |
||||
if (!IsUtilizationValid(value)) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] Mem utilization value rejected: %f", this, value); |
||||
} |
||||
return *this; |
||||
} |
||||
mem_utilization_.store(value, std::memory_order_relaxed); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] Mem utilization recorded: %f", this, value); |
||||
} |
||||
return *this; |
||||
} |
||||
|
||||
experimental::CallMetricRecorder& BackendMetricState::RecordQpsMetric( |
||||
double value) { |
||||
if (!IsQpsValid(value)) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] QPS value rejected: %f", this, value); |
||||
} |
||||
return *this; |
||||
} |
||||
qps_.store(value, std::memory_order_relaxed); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] QPS recorded: %f", this, value); |
||||
} |
||||
return *this; |
||||
} |
||||
|
||||
experimental::CallMetricRecorder& BackendMetricState::RecordUtilizationMetric( |
||||
string_ref name, double value) { |
||||
if (!IsUtilizationValid(value)) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] Utilization value rejected: %s %f", this, |
||||
std::string(name.data(), name.length()).c_str(), value); |
||||
} |
||||
return *this; |
||||
} |
||||
internal::MutexLock lock(&mu_); |
||||
absl::string_view name_sv(name.data(), name.length()); |
||||
utilization_[name_sv] = value; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] Utilization recorded: %s %f", this, |
||||
std::string(name_sv).c_str(), value); |
||||
} |
||||
return *this; |
||||
} |
||||
|
||||
experimental::CallMetricRecorder& BackendMetricState::RecordRequestCostMetric( |
||||
string_ref name, double value) { |
||||
internal::MutexLock lock(&mu_); |
||||
absl::string_view name_sv(name.data(), name.length()); |
||||
request_cost_[name_sv] = value; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, "[%p] Request cost recorded: %s %f", this, |
||||
std::string(name_sv).c_str(), value); |
||||
} |
||||
return *this; |
||||
} |
||||
|
||||
BackendMetricData BackendMetricState::GetBackendMetricData() { |
||||
// Merge metrics from the ServerMetricRecorder first since metrics recorded
|
||||
// to CallMetricRecorder takes a higher precedence.
|
||||
BackendMetricData data; |
||||
if (server_metric_recorder_ != nullptr) { |
||||
data = server_metric_recorder_->GetMetrics(); |
||||
} |
||||
// Only overwrite if the value is set i.e. in the valid range.
|
||||
const double cpu = cpu_utilization_.load(std::memory_order_relaxed); |
||||
if (IsUtilizationValid(cpu)) { |
||||
data.cpu_utilization = cpu; |
||||
} |
||||
const double mem = mem_utilization_.load(std::memory_order_relaxed); |
||||
if (IsUtilizationValid(mem)) { |
||||
data.mem_utilization = mem; |
||||
} |
||||
const double qps = qps_.load(std::memory_order_relaxed); |
||||
if (IsQpsValid(qps)) { |
||||
data.qps = qps; |
||||
} |
||||
{ |
||||
internal::MutexLock lock(&mu_); |
||||
data.utilization = std::move(utilization_); |
||||
data.request_cost = std::move(request_cost_); |
||||
} |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[%p] Backend metric data returned: cpu:%f mem:%f qps:%f " |
||||
"utilization size:%" PRIuPTR " request_cost size:%" PRIuPTR, |
||||
this, data.cpu_utilization, data.mem_utilization, data.qps, |
||||
data.utilization.size(), data.request_cost.size()); |
||||
} |
||||
return data; |
||||
} |
||||
|
||||
} // namespace grpc
|
@ -0,0 +1,81 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CPP_SERVER_BACKEND_METRIC_RECORDER_H |
||||
#define GRPC_SRC_CPP_SERVER_BACKEND_METRIC_RECORDER_H |
||||
|
||||
#include <stdint.h> |
||||
|
||||
#include <atomic> |
||||
#include <map> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpcpp/ext/call_metric_recorder.h> |
||||
#include <grpcpp/ext/server_metric_recorder.h> |
||||
#include <grpcpp/impl/sync.h> |
||||
#include <grpcpp/support/string_ref.h> |
||||
|
||||
#include "src/core/ext/filters/backend_metrics/backend_metric_provider.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" |
||||
|
||||
namespace grpc { |
||||
namespace experimental { |
||||
|
||||
// Backend metrics and an associated update sequence number.
|
||||
struct ServerMetricRecorder::BackendMetricDataState { |
||||
grpc_core::BackendMetricData data; |
||||
uint64_t sequence_number = 0; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
|
||||
class BackendMetricState : public grpc_core::BackendMetricProvider, |
||||
public experimental::CallMetricRecorder { |
||||
public: |
||||
// `server_metric_recorder` is optional. When set, GetBackendMetricData()
|
||||
// merges metrics from `server_metric_recorder` with metrics recorded to this.
|
||||
explicit BackendMetricState( |
||||
experimental::ServerMetricRecorder* server_metric_recorder) |
||||
: server_metric_recorder_(server_metric_recorder) {} |
||||
experimental::CallMetricRecorder& RecordCpuUtilizationMetric( |
||||
double value) override; |
||||
experimental::CallMetricRecorder& RecordMemoryUtilizationMetric( |
||||
double value) override; |
||||
experimental::CallMetricRecorder& RecordQpsMetric(double value) override; |
||||
experimental::CallMetricRecorder& RecordUtilizationMetric( |
||||
string_ref name, double value) override; |
||||
experimental::CallMetricRecorder& RecordRequestCostMetric( |
||||
string_ref name, double value) override; |
||||
// This clears metrics currently recorded. Don't call twice.
|
||||
grpc_core::BackendMetricData GetBackendMetricData() override; |
||||
|
||||
private: |
||||
experimental::ServerMetricRecorder* server_metric_recorder_; |
||||
std::atomic<double> cpu_utilization_{-1.0}; |
||||
std::atomic<double> mem_utilization_{-1.0}; |
||||
std::atomic<double> qps_{-1.0}; |
||||
internal::Mutex mu_; |
||||
std::map<absl::string_view, double> utilization_ ABSL_GUARDED_BY(mu_); |
||||
std::map<absl::string_view, double> request_cost_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPC_SRC_CPP_SERVER_BACKEND_METRIC_RECORDER_H
|
@ -1,125 +0,0 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include <map> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/strings/string_view.h" |
||||
#include "absl/types/optional.h" |
||||
#include "upb/upb.h" |
||||
#include "upb/upb.hpp" |
||||
#include "xds/data/orca/v3/orca_load_report.upb.h" |
||||
|
||||
#include <grpcpp/ext/call_metric_recorder.h> |
||||
#include <grpcpp/impl/sync.h> |
||||
#include <grpcpp/support/string_ref.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
|
||||
namespace grpc { |
||||
namespace experimental { |
||||
|
||||
CallMetricRecorder::CallMetricRecorder(grpc_core::Arena* arena) |
||||
: backend_metric_data_(arena->New<grpc_core::BackendMetricData>()) {} |
||||
|
||||
CallMetricRecorder::~CallMetricRecorder() { |
||||
backend_metric_data_->~BackendMetricData(); |
||||
} |
||||
|
||||
CallMetricRecorder& CallMetricRecorder::RecordCpuUtilizationMetric( |
||||
double value) { |
||||
internal::MutexLock lock(&mu_); |
||||
backend_metric_data_->cpu_utilization = value; |
||||
return *this; |
||||
} |
||||
|
||||
CallMetricRecorder& CallMetricRecorder::RecordMemoryUtilizationMetric( |
||||
double value) { |
||||
internal::MutexLock lock(&mu_); |
||||
backend_metric_data_->mem_utilization = value; |
||||
return *this; |
||||
} |
||||
|
||||
CallMetricRecorder& CallMetricRecorder::RecordQpsMetric(double value) { |
||||
internal::MutexLock lock(&mu_); |
||||
backend_metric_data_->qps = value; |
||||
return *this; |
||||
} |
||||
|
||||
CallMetricRecorder& CallMetricRecorder::RecordUtilizationMetric( |
||||
grpc::string_ref name, double value) { |
||||
internal::MutexLock lock(&mu_); |
||||
absl::string_view name_sv(name.data(), name.length()); |
||||
backend_metric_data_->utilization[name_sv] = value; |
||||
return *this; |
||||
} |
||||
|
||||
CallMetricRecorder& CallMetricRecorder::RecordRequestCostMetric( |
||||
grpc::string_ref name, double value) { |
||||
internal::MutexLock lock(&mu_); |
||||
absl::string_view name_sv(name.data(), name.length()); |
||||
backend_metric_data_->request_cost[name_sv] = value; |
||||
return *this; |
||||
} |
||||
|
||||
absl::optional<std::string> CallMetricRecorder::CreateSerializedReport() { |
||||
upb::Arena arena; |
||||
internal::MutexLock lock(&mu_); |
||||
bool has_data = backend_metric_data_->cpu_utilization != -1 || |
||||
backend_metric_data_->mem_utilization != -1 || |
||||
!backend_metric_data_->utilization.empty() || |
||||
!backend_metric_data_->request_cost.empty(); |
||||
if (!has_data) { |
||||
return absl::nullopt; |
||||
} |
||||
xds_data_orca_v3_OrcaLoadReport* response = |
||||
xds_data_orca_v3_OrcaLoadReport_new(arena.ptr()); |
||||
if (backend_metric_data_->cpu_utilization != -1) { |
||||
xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization( |
||||
response, backend_metric_data_->cpu_utilization); |
||||
} |
||||
if (backend_metric_data_->mem_utilization != -1) { |
||||
xds_data_orca_v3_OrcaLoadReport_set_mem_utilization( |
||||
response, backend_metric_data_->mem_utilization); |
||||
} |
||||
if (backend_metric_data_->qps != -1) { |
||||
xds_data_orca_v3_OrcaLoadReport_set_rps_fractional( |
||||
response, backend_metric_data_->qps); |
||||
} |
||||
for (const auto& p : backend_metric_data_->request_cost) { |
||||
xds_data_orca_v3_OrcaLoadReport_request_cost_set( |
||||
response, |
||||
upb_StringView_FromDataAndSize(p.first.data(), p.first.size()), |
||||
p.second, arena.ptr()); |
||||
} |
||||
for (const auto& p : backend_metric_data_->utilization) { |
||||
xds_data_orca_v3_OrcaLoadReport_utilization_set( |
||||
response, |
||||
upb_StringView_FromDataAndSize(p.first.data(), p.first.size()), |
||||
p.second, arena.ptr()); |
||||
} |
||||
size_t buf_length; |
||||
char* buf = xds_data_orca_v3_OrcaLoadReport_serialize(response, arena.ptr(), |
||||
&buf_length); |
||||
return std::string(buf, buf_length); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc
|
@ -1,76 +0,0 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include "src/cpp/server/orca/orca_interceptor.h" |
||||
|
||||
#include <algorithm> |
||||
#include <map> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <utility> |
||||
#include <vector> |
||||
|
||||
#include "absl/strings/string_view.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpcpp/ext/call_metric_recorder.h> |
||||
#include <grpcpp/server_builder.h> |
||||
#include <grpcpp/server_context.h> |
||||
|
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
|
||||
namespace grpc { |
||||
namespace experimental { |
||||
|
||||
void OrcaServerInterceptor::Intercept(InterceptorBatchMethods* methods) { |
||||
if (methods->QueryInterceptionHookPoint( |
||||
InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) { |
||||
auto context = info_->server_context(); |
||||
context->CreateCallMetricRecorder(); |
||||
} else if (methods->QueryInterceptionHookPoint( |
||||
InterceptionHookPoints::PRE_SEND_STATUS)) { |
||||
auto trailers = methods->GetSendTrailingMetadata(); |
||||
if (trailers != nullptr) { |
||||
auto context = info_->server_context(); |
||||
auto* recorder = context->call_metric_recorder_; |
||||
auto serialized = recorder->CreateSerializedReport(); |
||||
if (serialized.has_value() && !serialized->empty()) { |
||||
std::string key = |
||||
std::string(grpc_core::EndpointLoadMetricsBinMetadata::key()); |
||||
trailers->emplace( |
||||
std::make_pair(std::move(key), std::move(serialized.value()))); |
||||
} |
||||
} |
||||
} |
||||
methods->Proceed(); |
||||
} |
||||
|
||||
Interceptor* OrcaServerInterceptorFactory::CreateServerInterceptor( |
||||
ServerRpcInfo* info) { |
||||
return new OrcaServerInterceptor(info); |
||||
} |
||||
|
||||
void OrcaServerInterceptorFactory::Register(grpc::ServerBuilder* builder) { |
||||
builder->internal_interceptor_creators_.push_back( |
||||
std::make_unique<OrcaServerInterceptorFactory>()); |
||||
} |
||||
|
||||
void EnableCallMetricRecording(grpc::ServerBuilder* builder) { |
||||
OrcaServerInterceptorFactory::Register(builder); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc
|
@ -1,49 +0,0 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CPP_SERVER_ORCA_ORCA_INTERCEPTOR_H |
||||
#define GRPC_SRC_CPP_SERVER_ORCA_ORCA_INTERCEPTOR_H |
||||
|
||||
#include <grpcpp/support/interceptor.h> |
||||
#include <grpcpp/support/server_interceptor.h> |
||||
|
||||
namespace grpc { |
||||
|
||||
class ServerBuilder; |
||||
|
||||
namespace experimental { |
||||
class ServerRpcInfo; |
||||
|
||||
class OrcaServerInterceptor : public Interceptor { |
||||
public: |
||||
explicit OrcaServerInterceptor(ServerRpcInfo* info) : info_(info) {} |
||||
|
||||
void Intercept(InterceptorBatchMethods* methods) override; |
||||
|
||||
private: |
||||
ServerRpcInfo* info_; |
||||
}; |
||||
|
||||
class OrcaServerInterceptorFactory : public ServerInterceptorFactoryInterface { |
||||
public: |
||||
static void Register(ServerBuilder* builder); |
||||
Interceptor* CreateServerInterceptor(ServerRpcInfo* info) override; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPC_SRC_CPP_SERVER_ORCA_ORCA_INTERCEPTOR_H
|
Loading…
Reference in new issue