[call-v3] Move server load reporting filter to new api (#35471)

Closes #35471

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35471 from ctiller:slrf b9e685c1c1
PiperOrigin-RevId: 597040097
pull/35395/head^2
Craig Tiller 1 year ago committed by Copybara-Service
parent 4105425da5
commit 5150b2346d
  1. 128
      src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
  2. 22
      src/core/ext/filters/load_reporting/server_load_reporting_filter.h
  3. 43
      src/core/lib/channel/promise_based_filter.h

@ -72,6 +72,10 @@ constexpr char kEncodedIpv4AddressLengthString[] = "08";
constexpr char kEncodedIpv6AddressLengthString[] = "32";
constexpr char kEmptyAddressLengthString[] = "00";
const NoInterceptor ServerLoadReportingFilter::Call::OnServerInitialMetadata;
const NoInterceptor ServerLoadReportingFilter::Call::OnClientToServerMessage;
const NoInterceptor ServerLoadReportingFilter::Call::OnServerToClientMessage;
absl::StatusOr<ServerLoadReportingFilter> ServerLoadReportingFilter::Create(
const ChannelArgs& channel_args, ChannelFilter::Args) {
// Find and record the peer_identity.
@ -93,9 +97,9 @@ absl::StatusOr<ServerLoadReportingFilter> ServerLoadReportingFilter::Create(
namespace {
std::string GetCensusSafeClientIpString(
const ClientMetadataHandle& initial_metadata) {
const ClientMetadata& initial_metadata) {
// Find the client URI string.
Slice* client_uri_slice = initial_metadata->get_pointer(PeerString());
const Slice* client_uri_slice = initial_metadata.get_pointer(PeerString());
if (client_uri_slice == nullptr) {
gpr_log(GPR_ERROR,
"Unable to extract client URI string (peer string) from gRPC "
@ -139,8 +143,8 @@ std::string GetCensusSafeClientIpString(
}
}
std::string MakeClientIpAndLrToken(
absl::string_view lr_token, const ClientMetadataHandle& initial_metadata) {
std::string MakeClientIpAndLrToken(absl::string_view lr_token,
const ClientMetadata& initial_metadata) {
std::string client_ip = GetCensusSafeClientIpString(initial_metadata);
absl::string_view prefix;
switch (client_ip.length()) {
@ -176,82 +180,68 @@ const char* GetStatusTagForStatus(grpc_status_code status) {
}
} // namespace
ArenaPromise<ServerMetadataHandle> ServerLoadReportingFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
void ServerLoadReportingFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ServerLoadReportingFilter* filter) {
// Gather up basic facts about the request
Slice service_method;
if (const Slice* path =
call_args.client_initial_metadata->get_pointer(HttpPathMetadata())) {
if (const Slice* path = md.get_pointer(HttpPathMetadata())) {
service_method = path->Ref();
}
std::string target_host;
if (const Slice* authority = call_args.client_initial_metadata->get_pointer(
HttpAuthorityMetadata())) {
target_host = absl::AsciiStrToLower(authority->as_string_view());
if (const Slice* authority = md.get_pointer(HttpAuthorityMetadata())) {
target_host_ = absl::AsciiStrToLower(authority->as_string_view());
}
std::string client_ip_and_lr_token;
auto lb_token = call_args.client_initial_metadata->Take(LbTokenMetadata())
.value_or(Slice());
client_ip_and_lr_token = MakeClientIpAndLrToken(
lb_token.as_string_view(), call_args.client_initial_metadata);
auto lb_token = md.Take(LbTokenMetadata()).value_or(Slice());
client_ip_and_lr_token_ =
MakeClientIpAndLrToken(lb_token.as_string_view(), md);
// Record the beginning of the request
opencensus::stats::Record(
{{::grpc::load_reporter::MeasureStartCount(), 1}},
{{::grpc::load_reporter::TagKeyToken(),
{client_ip_and_lr_token.data(), client_ip_and_lr_token.length()}},
{client_ip_and_lr_token_.data(), client_ip_and_lr_token_.length()}},
{::grpc::load_reporter::TagKeyHost(),
{target_host_.data(), target_host_.length()}},
{::grpc::load_reporter::TagKeyUserId(),
{filter->peer_identity_.data(), filter->peer_identity_.length()}}});
}
void ServerLoadReportingFilter::Call::OnServerTrailingMetadata(
ServerMetadata& md, ServerLoadReportingFilter* filter) {
const auto& costs = md.Take(LbCostBinMetadata());
for (const auto& cost : costs) {
opencensus::stats::Record(
{{::grpc::load_reporter::MeasureOtherCallMetric(), cost.cost}},
{{::grpc::load_reporter::TagKeyToken(),
{client_ip_and_lr_token_.data(), client_ip_and_lr_token_.length()}},
{::grpc::load_reporter::TagKeyHost(),
{target_host_.data(), target_host_.length()}},
{::grpc::load_reporter::TagKeyUserId(),
{filter->peer_identity_.data(), filter->peer_identity_.length()}},
{::grpc::load_reporter::TagKeyMetricName(),
{cost.name.data(), cost.name.length()}}});
}
}
void ServerLoadReportingFilter::Call::OnFinalize(
const grpc_call_final_info* final_info, ServerLoadReportingFilter* filter) {
if (final_info == nullptr) return;
// After the last bytes have been placed on the wire we record
// final measurements
opencensus::stats::Record(
{{::grpc::load_reporter::MeasureEndCount(), 1},
{::grpc::load_reporter::MeasureEndBytesSent(),
final_info->stats.transport_stream_stats.outgoing.data_bytes},
{::grpc::load_reporter::MeasureEndBytesReceived(),
final_info->stats.transport_stream_stats.incoming.data_bytes},
{::grpc::load_reporter::MeasureEndLatencyMs(),
gpr_time_to_millis(final_info->stats.latency)}},
{{::grpc::load_reporter::TagKeyToken(),
{client_ip_and_lr_token_.data(), client_ip_and_lr_token_.length()}},
{::grpc::load_reporter::TagKeyHost(),
{target_host.data(), target_host.length()}},
{target_host_.data(), target_host_.length()}},
{::grpc::load_reporter::TagKeyUserId(),
{peer_identity_.data(), peer_identity_.length()}}});
// Returned promise runs the rest of the request, then reports costs and
// records measurements
return ArenaPromise<ServerMetadataHandle>(Seq(
// Call down the stack
next_promise_factory(std::move(call_args)),
// And then record the call result
[this, client_ip_and_lr_token,
target_host](ServerMetadataHandle trailing_metadata) {
const auto& costs = trailing_metadata->Take(LbCostBinMetadata());
for (const auto& cost : costs) {
opencensus::stats::Record(
{{::grpc::load_reporter::MeasureOtherCallMetric(), cost.cost}},
{{::grpc::load_reporter::TagKeyToken(),
{client_ip_and_lr_token.data(),
client_ip_and_lr_token.length()}},
{::grpc::load_reporter::TagKeyHost(),
{target_host.data(), target_host.length()}},
{::grpc::load_reporter::TagKeyUserId(),
{peer_identity_.data(), peer_identity_.length()}},
{::grpc::load_reporter::TagKeyMetricName(),
{cost.name.data(), cost.name.length()}}});
}
GetContext<CallFinalization>()->Add([this, client_ip_and_lr_token,
target_host](
const grpc_call_final_info*
final_info) {
if (final_info == nullptr) return;
// After the last bytes have been placed on the wire we record
// final measurements
opencensus::stats::Record(
{{::grpc::load_reporter::MeasureEndCount(), 1},
{::grpc::load_reporter::MeasureEndBytesSent(),
final_info->stats.transport_stream_stats.outgoing.data_bytes},
{::grpc::load_reporter::MeasureEndBytesReceived(),
final_info->stats.transport_stream_stats.incoming.data_bytes},
{::grpc::load_reporter::MeasureEndLatencyMs(),
gpr_time_to_millis(final_info->stats.latency)}},
{{::grpc::load_reporter::TagKeyToken(),
{client_ip_and_lr_token.data(),
client_ip_and_lr_token.length()}},
{::grpc::load_reporter::TagKeyHost(),
{target_host.data(), target_host.length()}},
{::grpc::load_reporter::TagKeyUserId(),
{peer_identity_.data(), peer_identity_.length()}},
{::grpc::load_reporter::TagKeyStatus(),
GetStatusTagForStatus(final_info->final_status)}});
});
return Immediate(std::move(trailing_metadata));
}));
{filter->peer_identity_.data(), filter->peer_identity_.length()}},
{::grpc::load_reporter::TagKeyStatus(),
GetStatusTagForStatus(final_info->final_status)}});
}
namespace {

@ -34,7 +34,8 @@
namespace grpc_core {
class ServerLoadReportingFilter : public ChannelFilter {
class ServerLoadReportingFilter
: public ImplementChannelFilter<ServerLoadReportingFilter> {
public:
static absl::StatusOr<ServerLoadReportingFilter> Create(
const ChannelArgs& args, ChannelFilter::Args);
@ -43,9 +44,22 @@ class ServerLoadReportingFilter : public ChannelFilter {
const char* peer_identity() { return peer_identity_.c_str(); }
size_t peer_identity_len() { return peer_identity_.length(); }
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& md,
ServerLoadReportingFilter* filter);
static const NoInterceptor OnServerInitialMetadata;
void OnServerTrailingMetadata(ServerMetadata& md,
ServerLoadReportingFilter* filter);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
void OnFinalize(const grpc_call_final_info* final_info,
ServerLoadReportingFilter* filter);
private:
std::string client_ip_and_lr_token_;
std::string target_host_;
};
private:
// The peer's authenticated identity.

@ -331,6 +331,16 @@ auto MapResult(void (Derived::Call::*fn)(ServerMetadata&), Promise x,
});
}
template <typename Promise, typename Derived>
auto MapResult(void (Derived::Call::*fn)(ServerMetadata&, Derived*), Promise x,
FilterCallData<Derived>* call_data) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata);
return Map(std::move(x), [call_data](ServerMetadataHandle md) {
call_data->call.OnServerTrailingMetadata(*md, call_data->channel);
return md;
});
}
template <typename Interceptor, typename Derived, typename SfinaeVoid = void>
struct RunCallImpl;
@ -928,6 +938,19 @@ inline void InterceptServerTrailingMetadata(
});
}
template <typename Derived>
inline void InterceptServerTrailingMetadata(
void (Derived::Call::*fn)(ServerMetadata&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata);
call_spine->server_trailing_metadata().sender.InterceptAndMap(
[call, channel](ServerMetadataHandle md) {
call->OnServerTrailingMetadata(*md, channel);
return md;
});
}
template <typename Derived>
inline void InterceptServerTrailingMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&),
@ -941,11 +964,11 @@ inline void InterceptServerTrailingMetadata(
});
}
inline void InterceptFinalize(const NoInterceptor*, void*) {}
inline void InterceptFinalize(const NoInterceptor*, void*, void*) {}
template <class Call>
inline void InterceptFinalize(void (Call::*fn)(const grpc_call_final_info*),
Call* call) {
void*, Call* call) {
GPR_DEBUG_ASSERT(fn == &Call::OnFinalize);
GetContext<CallFinalization>()->Add(
[call](const grpc_call_final_info* final_info) {
@ -953,6 +976,17 @@ inline void InterceptFinalize(void (Call::*fn)(const grpc_call_final_info*),
});
}
template <class Derived>
inline void InterceptFinalize(
void (Derived::Call::*fn)(const grpc_call_final_info*, Derived*),
Derived* channel, typename Derived::Call* call) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnFinalize);
GetContext<CallFinalization>()->Add(
[call, channel](const grpc_call_final_info* final_info) {
call->OnFinalize(final_info, channel);
});
}
template <typename Derived>
absl::enable_if_t<std::is_empty<FilterCallData<Derived>>::value,
FilterCallData<Derived>*>
@ -1050,7 +1084,8 @@ class ImplementChannelFilter : public ChannelFilter {
promise_filter_detail::InterceptServerTrailingMetadata(
&Derived::Call::OnServerTrailingMetadata, call,
static_cast<Derived*>(this), call_spine);
promise_filter_detail::InterceptFinalize(&Derived::Call::OnFinalize, call);
promise_filter_detail::InterceptFinalize(&Derived::Call::OnFinalize,
static_cast<Derived*>(this), call);
}
// Polyfill for the original promise scheme.
@ -1067,7 +1102,7 @@ class ImplementChannelFilter : public ChannelFilter {
promise_filter_detail::InterceptServerToClientMessage(
&Derived::Call::OnServerToClientMessage, call, call_args);
promise_filter_detail::InterceptFinalize(
&Derived::Call::OnFinalize,
&Derived::Call::OnFinalize, static_cast<Derived*>(this),
static_cast<typename Derived::Call*>(&call->call));
return promise_filter_detail::MapResult(
&Derived::Call::OnServerTrailingMetadata,

Loading…
Cancel
Save