Convert server load reporting to promises (#28927)

* Call finalization for promises

* Convert filter to a promise

* Call finalization for promises

* comment

* split out and test

* dont use promise_detail:: directly

* fix

* Automated change: Fix sanity tests

* fix

* ?

* fix

* finish it

* modernize

* fix-test

* fix

* Review feedback

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/29249/head
Craig Tiller 3 years ago committed by GitHub
parent 44f2c004c5
commit 44167bdfd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 329
      src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
  3. 82
      src/core/ext/filters/load_reporting/server_load_reporting_filter.h
  4. 6
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  5. 2
      src/core/lib/surface/call.cc
  6. 13
      src/core/lib/transport/metadata_batch.h
  7. 2
      test/cpp/end2end/server_load_reporting_end2end_test.cc

@ -3329,6 +3329,7 @@ grpc_cc_library(
], ],
language = "c++", language = "c++",
deps = [ deps = [
"config",
"error", "error",
"gpr", "gpr",
"grpc++_base", "grpc++_base",
@ -3336,6 +3337,7 @@ grpc_cc_library(
"grpc_lb_policy_grpclb", "grpc_lb_policy_grpclb",
"grpc_security_base", "grpc_security_base",
"grpc_sockaddr", "grpc_sockaddr",
"seq",
"slice", "slice",
"uri_parser", "uri_parser",
], ],

@ -34,12 +34,16 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h" #include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h"
#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
#include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/context.h" #include "src/core/lib/channel/context.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils.h" #include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call.h"
@ -50,14 +54,12 @@ namespace grpc {
constexpr char kEncodedIpv4AddressLengthString[] = "08"; constexpr char kEncodedIpv4AddressLengthString[] = "08";
constexpr char kEncodedIpv6AddressLengthString[] = "32"; constexpr char kEncodedIpv6AddressLengthString[] = "32";
constexpr char kEmptyAddressLengthString[] = "00"; constexpr char kEmptyAddressLengthString[] = "00";
constexpr size_t kLengthPrefixSize = 2;
grpc_error_handle ServerLoadReportingChannelData::Init( absl::StatusOr<ServerLoadReportingFilter> ServerLoadReportingFilter::Create(
grpc_channel_element* /* elem */, grpc_channel_element_args* args) { const grpc_channel_args* args, grpc_core::ChannelFilter::Args) {
GPR_ASSERT(!args->is_last);
// Find and record the peer_identity. // Find and record the peer_identity.
const grpc_auth_context* auth_context = ServerLoadReportingFilter filter;
grpc_find_auth_context_in_args(args->channel_args); const grpc_auth_context* auth_context = grpc_find_auth_context_in_args(args);
if (auth_context != nullptr && if (auth_context != nullptr &&
grpc_auth_context_peer_is_authenticated(auth_context)) { grpc_auth_context_peer_is_authenticated(auth_context)) {
grpc_auth_property_iterator auth_it = grpc_auth_property_iterator auth_it =
@ -65,90 +67,26 @@ grpc_error_handle ServerLoadReportingChannelData::Init(
const grpc_auth_property* auth_property = const grpc_auth_property* auth_property =
grpc_auth_property_iterator_next(&auth_it); grpc_auth_property_iterator_next(&auth_it);
if (auth_property != nullptr) { if (auth_property != nullptr) {
peer_identity_ = auth_property->value; filter.peer_identity_ =
peer_identity_len_ = auth_property->value_length; std::string(auth_property->value, auth_property->value_length);
} }
} }
return GRPC_ERROR_NONE; return std::move(filter);
}
void ServerLoadReportingCallData::Destroy(
grpc_call_element* elem, const grpc_call_final_info* final_info,
grpc_closure* /*then_call_closure*/) {
ServerLoadReportingChannelData* chand =
reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
// Only record an end if we've recorded its corresponding start, which is
// indicated by a non-null client_ip_and_lr_token_. Note that it's possible
// that we attempt to record the call end before we have recorded the call
// start, because the data needed for recording the start comes from the
// initial metadata, which may not be ready before the call finishes.
if (client_ip_and_lr_token_ != nullptr) {
opencensus::stats::Record(
{{::grpc::load_reporter::MeasureEndCount(), 1},
{::grpc::load_reporter::MeasureEndBytesSent(),
final_info->stats.transport_stream_stats.outgoing.data_bytes},
{::grpc::load_reporter::MeasureEndBytesReceived(),
final_info->stats.transport_stream_stats.incoming.data_bytes},
{::grpc::load_reporter::MeasureEndLatencyMs(),
gpr_time_to_millis(final_info->stats.latency)}},
{{::grpc::load_reporter::TagKeyToken(),
{client_ip_and_lr_token_, client_ip_and_lr_token_len_}},
{::grpc::load_reporter::TagKeyHost(),
{target_host_.data(), target_host_.length()}},
{::grpc::load_reporter::TagKeyUserId(),
{chand->peer_identity(), chand->peer_identity_len()}},
{::grpc::load_reporter::TagKeyStatus(),
GetStatusTagForStatus(final_info->final_status)}});
gpr_free(client_ip_and_lr_token_);
}
grpc_slice_unref_internal(service_method_);
} }
void ServerLoadReportingCallData::StartTransportStreamOpBatch( namespace {
grpc_call_element* elem, TransportStreamOpBatch* op) { std::string GetCensusSafeClientIpString(
GPR_TIMER_SCOPE("lr_start_transport_stream_op", 0); const grpc_core::ClientMetadataHandle& initial_metadata) {
if (op->recv_initial_metadata() != nullptr) {
// Save some fields to use when initial metadata is ready.
peer_string_ = op->get_peer_string();
recv_initial_metadata_ =
op->op()->payload->recv_initial_metadata.recv_initial_metadata;
original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
// Substitute the original closure for the wrapper closure.
op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
}
if (op->send_trailing_metadata() != nullptr) {
const auto& costs = op->send_trailing_metadata()->batch()->Take(
grpc_core::LbCostBinMetadata());
for (const auto& cost : costs) {
ServerLoadReportingChannelData* chand =
reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
opencensus::stats::Record(
{{::grpc::load_reporter::MeasureOtherCallMetric(), cost.cost}},
{{::grpc::load_reporter::TagKeyToken(),
{client_ip_and_lr_token_, client_ip_and_lr_token_len_}},
{::grpc::load_reporter::TagKeyHost(),
{target_host_.data(), target_host_.length()}},
{::grpc::load_reporter::TagKeyUserId(),
{chand->peer_identity(), chand->peer_identity_len()}},
{::grpc::load_reporter::TagKeyMetricName(),
{cost.name.data(), cost.name.length()}}});
}
}
grpc_call_next_op(elem, op->op());
}
std::string ServerLoadReportingCallData::GetCensusSafeClientIpString() {
// Find the client URI string. // Find the client URI string.
const char* client_uri_str = auto client_uri_str = initial_metadata->get(grpc_core::PeerString());
reinterpret_cast<const char*>(gpr_atm_acq_load(peer_string_)); if (!client_uri_str.has_value()) {
if (client_uri_str == nullptr) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"Unable to extract client URI string (peer string) from gRPC " "Unable to extract client URI string (peer string) from gRPC "
"metadata."); "metadata.");
return ""; return "";
} }
absl::StatusOr<grpc_core::URI> client_uri = absl::StatusOr<grpc_core::URI> client_uri =
grpc_core::URI::Parse(client_uri_str); grpc_core::URI::Parse(*client_uri_str);
if (!client_uri.ok()) { if (!client_uri.ok()) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"Unable to parse the client URI string (peer string) to a client " "Unable to parse the client URI string (peer string) to a client "
@ -184,95 +122,28 @@ std::string ServerLoadReportingCallData::GetCensusSafeClientIpString() {
} }
} }
void ServerLoadReportingCallData::StoreClientIpAndLrToken(const char* lr_token, std::string MakeClientIpAndLrToken(
size_t lr_token_len) { absl::string_view lr_token,
std::string client_ip = GetCensusSafeClientIpString(); const grpc_core::ClientMetadataHandle& initial_metadata) {
client_ip_and_lr_token_len_ = std::string client_ip = GetCensusSafeClientIpString(initial_metadata);
kLengthPrefixSize + client_ip.size() + lr_token_len; absl::string_view prefix;
client_ip_and_lr_token_ = static_cast<char*>( switch (client_ip.length()) {
gpr_zalloc(client_ip_and_lr_token_len_ * sizeof(char))); case 0:
char* cur_pos = client_ip_and_lr_token_; prefix = kEmptyAddressLengthString;
// Store the IP length prefix. break;
if (client_ip.empty()) { case 8:
strncpy(cur_pos, kEmptyAddressLengthString, kLengthPrefixSize); prefix = kEncodedIpv4AddressLengthString;
} else if (client_ip.size() == 8) { break;
strncpy(cur_pos, kEncodedIpv4AddressLengthString, kLengthPrefixSize); case 32:
} else if (client_ip.size() == 32) { prefix = kEncodedIpv6AddressLengthString;
strncpy(cur_pos, kEncodedIpv6AddressLengthString, kLengthPrefixSize); break;
} else { default:
GPR_UNREACHABLE_CODE(); GPR_UNREACHABLE_CODE();
}
cur_pos += kLengthPrefixSize;
// Store the IP.
if (!client_ip.empty()) {
strncpy(cur_pos, client_ip.c_str(), client_ip.size());
}
cur_pos += client_ip.size();
// Store the LR token.
if (lr_token_len != 0) {
strncpy(cur_pos, lr_token, lr_token_len);
}
GPR_ASSERT(
static_cast<size_t>(cur_pos + lr_token_len - client_ip_and_lr_token_) ==
client_ip_and_lr_token_len_);
}
void ServerLoadReportingCallData::RecvInitialMetadataReady(
void* arg, grpc_error_handle err) {
grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(arg);
ServerLoadReportingCallData* calld =
reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
ServerLoadReportingChannelData* chand =
reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
if (err == GRPC_ERROR_NONE) {
if (const grpc_core::Slice* path =
calld->recv_initial_metadata_->get_pointer(
grpc_core::HttpPathMetadata())) {
calld->service_method_ = path->Ref().TakeCSlice();
}
if (const grpc_core::Slice* authority =
calld->recv_initial_metadata_->get_pointer(
grpc_core::HttpAuthorityMetadata())) {
calld->target_host_ = absl::AsciiStrToLower(authority->as_string_view());
}
std::string buffer;
auto lb_token =
calld->recv_initial_metadata_->Take(grpc_core::LbTokenMetadata());
if (lb_token.has_value()) {
if (calld->client_ip_and_lr_token_ == nullptr) {
calld->StoreClientIpAndLrToken(
reinterpret_cast<const char*>(lb_token->data()), lb_token->size());
}
}
// If the LB token was not found in the recv_initial_metadata, only the
// client IP part will be recorded (with an empty LB token).
if (calld->client_ip_and_lr_token_ == nullptr) {
calld->StoreClientIpAndLrToken(nullptr, 0);
}
opencensus::stats::Record(
{{::grpc::load_reporter::MeasureStartCount(), 1}},
{{::grpc::load_reporter::TagKeyToken(),
{calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
{::grpc::load_reporter::TagKeyHost(),
{calld->target_host_.data(), calld->target_host_.length()}},
{::grpc::load_reporter::TagKeyUserId(),
{chand->peer_identity(), chand->peer_identity_len()}}});
} }
grpc_core::Closure::Run(DEBUG_LOCATION, return absl::StrCat(prefix, client_ip, lr_token);
calld->original_recv_initial_metadata_ready_,
GRPC_ERROR_REF(err));
} }
grpc_error_handle ServerLoadReportingCallData::Init( const char* GetStatusTagForStatus(grpc_status_code status) {
grpc_call_element* elem, const grpc_call_element_args* /*args*/) {
service_method_ = grpc_empty_slice();
GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
elem, grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
const char* ServerLoadReportingCallData::GetStatusTagForStatus(
grpc_status_code status) {
switch (status) { switch (status) {
case GRPC_STATUS_OK: case GRPC_STATUS_OK:
return grpc::load_reporter::kCallStatusOk; return grpc::load_reporter::kCallStatusOk;
@ -287,12 +158,105 @@ const char* ServerLoadReportingCallData::GetStatusTagForStatus(
return grpc::load_reporter::kCallStatusClientError; return grpc::load_reporter::kCallStatusClientError;
} }
} }
} // namespace
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
ServerLoadReportingFilter::MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) {
// Gather up basic facts about the request
grpc_core::Slice service_method;
if (const grpc_core::Slice* path =
call_args.client_initial_metadata->get_pointer(
grpc_core::HttpPathMetadata())) {
service_method = path->Ref();
}
std::string target_host;
if (const grpc_core::Slice* authority =
call_args.client_initial_metadata->get_pointer(
grpc_core::HttpAuthorityMetadata())) {
target_host = absl::AsciiStrToLower(authority->as_string_view());
}
std::string client_ip_and_lr_token;
auto lb_token =
call_args.client_initial_metadata->Take(grpc_core::LbTokenMetadata())
.value_or(grpc_core::Slice());
client_ip_and_lr_token = MakeClientIpAndLrToken(
lb_token.as_string_view(), call_args.client_initial_metadata);
// Record the beginning of the request
opencensus::stats::Record(
{{::grpc::load_reporter::MeasureStartCount(), 1}},
{{::grpc::load_reporter::TagKeyToken(),
{client_ip_and_lr_token.data(), client_ip_and_lr_token.length()}},
{::grpc::load_reporter::TagKeyHost(),
{target_host.data(), target_host.length()}},
{::grpc::load_reporter::TagKeyUserId(),
{peer_identity_.data(), peer_identity_.length()}}});
// Returned promise runs the rest of the request, then reports costs and
// records measurements
return grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>(
grpc_core::Seq(
// Call down the stack
next_promise_factory(std::move(call_args)),
// And then record the call result
[this, client_ip_and_lr_token,
target_host](grpc_core::ServerMetadataHandle trailing_metadata) {
const auto& costs =
trailing_metadata->Take(grpc_core::LbCostBinMetadata());
for (const auto& cost : costs) {
opencensus::stats::Record(
{{::grpc::load_reporter::MeasureOtherCallMetric(),
cost.cost}},
{{::grpc::load_reporter::TagKeyToken(),
{client_ip_and_lr_token.data(),
client_ip_and_lr_token.length()}},
{::grpc::load_reporter::TagKeyHost(),
{target_host.data(), target_host.length()}},
{::grpc::load_reporter::TagKeyUserId(),
{peer_identity_.data(), peer_identity_.length()}},
{::grpc::load_reporter::TagKeyMetricName(),
{cost.name.data(), cost.name.length()}}});
}
grpc_core::GetContext<grpc_core::CallFinalization>()->Add(
[this, client_ip_and_lr_token,
target_host](const grpc_call_final_info* final_info) {
if (final_info == nullptr) return;
// After the last bytes have been placed on the wire we record
// final measurements
opencensus::stats::Record(
{{::grpc::load_reporter::MeasureEndCount(), 1},
{::grpc::load_reporter::MeasureEndBytesSent(),
final_info->stats.transport_stream_stats.outgoing
.data_bytes},
{::grpc::load_reporter::MeasureEndBytesReceived(),
final_info->stats.transport_stream_stats.incoming
.data_bytes},
{::grpc::load_reporter::MeasureEndLatencyMs(),
gpr_time_to_millis(final_info->stats.latency)}},
{{::grpc::load_reporter::TagKeyToken(),
{client_ip_and_lr_token.data(),
client_ip_and_lr_token.length()}},
{::grpc::load_reporter::TagKeyHost(),
{target_host.data(), target_host.length()}},
{::grpc::load_reporter::TagKeyUserId(),
{peer_identity_.data(), peer_identity_.length()}},
{::grpc::load_reporter::TagKeyStatus(),
GetStatusTagForStatus(final_info->final_status)}});
});
return grpc_core::Immediate(std::move(trailing_metadata));
}));
}
namespace { namespace {
bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) { bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
return grpc_channel_arg_get_bool( return grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false); grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
} }
const grpc_channel_filter kFilter =
grpc_core::MakePromiseBasedFilter<ServerLoadReportingFilter,
grpc_core::FilterEndpoint::kServer>(
"server_load_reporting");
} // namespace } // namespace
// TODO(juanlishen): We should register the filter during grpc initialization // TODO(juanlishen): We should register the filter during grpc initialization
@ -301,21 +265,26 @@ bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
// time if we build with the filter target. // time if we build with the filter target.
struct ServerLoadReportingFilterStaticRegistrar { struct ServerLoadReportingFilterStaticRegistrar {
ServerLoadReportingFilterStaticRegistrar() { ServerLoadReportingFilterStaticRegistrar() {
static std::atomic<bool> registered{false}; grpc_core::CoreConfiguration::RegisterBuilder(
if (registered.load(std::memory_order_acquire)) return; [](grpc_core::CoreConfiguration::Builder* builder) {
RegisterChannelFilter<ServerLoadReportingChannelData, // Access measures to ensure they are initialized. Otherwise, we can't
ServerLoadReportingCallData>( // create any valid view before the first RPC.
"server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX, grpc::load_reporter::MeasureStartCount();
MaybeAddServerLoadReportingFilter); grpc::load_reporter::MeasureEndCount();
// Access measures to ensure they are initialized. Otherwise, we can't grpc::load_reporter::MeasureEndBytesSent();
// create any valid view before the first RPC. grpc::load_reporter::MeasureEndBytesReceived();
grpc::load_reporter::MeasureStartCount(); grpc::load_reporter::MeasureEndLatencyMs();
grpc::load_reporter::MeasureEndCount(); grpc::load_reporter::MeasureOtherCallMetric();
grpc::load_reporter::MeasureEndBytesSent(); builder->channel_init()->RegisterStage(
grpc::load_reporter::MeasureEndBytesReceived(); GRPC_SERVER_CHANNEL, INT_MAX,
grpc::load_reporter::MeasureEndLatencyMs(); [](grpc_core::ChannelStackBuilder* cs_builder) {
grpc::load_reporter::MeasureOtherCallMetric(); if (MaybeAddServerLoadReportingFilter(
registered.store(true, std::memory_order_release); *cs_builder->channel_args())) {
cs_builder->PrependFilter(&kFilter, nullptr);
}
return true;
});
});
} }
} server_load_reporting_filter_static_registrar; } server_load_reporting_filter_static_registrar;

@ -24,85 +24,27 @@
#include <string> #include <string>
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
#include "src/cpp/common/channel_filter.h" #include "src/core/lib/channel/promise_based_filter.h"
namespace grpc { namespace grpc {
class ServerLoadReportingChannelData : public ChannelData { class ServerLoadReportingFilter : public grpc_core::ChannelFilter {
public: public:
grpc_error_handle Init(grpc_channel_element* elem, static absl::StatusOr<ServerLoadReportingFilter> Create(
grpc_channel_element_args* args) override; const grpc_channel_args* args, grpc_core::ChannelFilter::Args);
// Getters. // Getters.
const char* peer_identity() { return peer_identity_; } const char* peer_identity() { return peer_identity_.c_str(); }
size_t peer_identity_len() { return peer_identity_len_; } size_t peer_identity_len() { return peer_identity_.length(); }
private: // Construct a promise for one call.
// The peer's authenticated identity. grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
char* peer_identity_ = nullptr; grpc_core::CallArgs call_args,
size_t peer_identity_len_ = 0; grpc_core::NextPromiseFactory next_promise_factory) override;
};
class ServerLoadReportingCallData : public CallData {
public:
grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args) override;
void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info,
grpc_closure* then_call_closure) override;
void StartTransportStreamOpBatch(grpc_call_element* elem,
TransportStreamOpBatch* op) override;
private: private:
// From the peer_string_ in calld, extracts the client IP string (owned by // The peer's authenticated identity.
// caller), e.g., "01020a0b". Upon failure, returns empty string. std::string peer_identity_;
std::string GetCensusSafeClientIpString();
// Concatenates the client IP address and the load reporting token, then
// stores the result into the call data.
void StoreClientIpAndLrToken(const char* lr_token, size_t lr_token_len);
// This matches the classification of the status codes in
// googleapis/google/rpc/code.proto.
static const char* GetStatusTagForStatus(grpc_status_code status);
// Records the call start.
static void RecvInitialMetadataReady(void* arg, grpc_error_handle err);
// The peer string (a member of the recv_initial_metadata op). Note that
// gpr_atm itself is a pointer type here, making "peer_string_" effectively a
// double pointer.
const gpr_atm* peer_string_;
// The received initial metadata (a member of the recv_initial_metadata op).
// When it is ready, we will extract some data from it via
// recv_initial_metadata_ready_ closure, before the original
// recv_initial_metadata_ready closure.
grpc_metadata_batch* recv_initial_metadata_;
// The original recv_initial_metadata closure, which is wrapped by our own
// closure (recv_initial_metadata_ready_) to capture the incoming initial
// metadata.
grpc_closure* original_recv_initial_metadata_ready_;
// The closure that wraps the original closure. Scheduled when
// recv_initial_metadata_ is ready.
grpc_closure recv_initial_metadata_ready_;
// Corresponds to the :path header.
grpc_slice service_method_;
// The backend host that the client thinks it's talking to. This may be
// different from the actual backend in the case of, for example,
// load-balanced targets. We store a copy of the metadata slice in order to
// lowercase it. */
std::string target_host_;
// The client IP address (including a length prefix) and the load reporting
// token.
char* client_ip_and_lr_token_;
size_t client_ip_and_lr_token_len_;
}; };
} // namespace grpc } // namespace grpc

@ -1963,8 +1963,8 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
// INPUT PROCESSING - GENERAL // INPUT PROCESSING - GENERAL
// //
void grpc_chttp2_maybe_complete_recv_initial_metadata( void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
grpc_chttp2_transport* /*t*/, grpc_chttp2_stream* s) { grpc_chttp2_stream* s) {
if (s->recv_initial_metadata_ready != nullptr && if (s->recv_initial_metadata_ready != nullptr &&
s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) { s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
if (s->seen_error) { if (s->seen_error) {
@ -1975,6 +1975,7 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(
} }
} }
*s->recv_initial_metadata = std::move(s->initial_metadata_buffer); *s->recv_initial_metadata = std::move(s->initial_metadata_buffer);
s->recv_initial_metadata->Set(grpc_core::PeerString(), t->peer_string);
// If we didn't receive initial metadata from the wire and instead faked a // If we didn't receive initial metadata from the wire and instead faked a
// status (due to stream cancellations for example), let upper layers know // status (due to stream cancellations for example), let upper layers know
// that trailing metadata is immediately available. // that trailing metadata is immediately available.
@ -2070,6 +2071,7 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
grpc_transport_move_stats(&s->stats, s->collecting_stats); grpc_transport_move_stats(&s->stats, s->collecting_stats);
s->collecting_stats = nullptr; s->collecting_stats = nullptr;
*s->recv_trailing_metadata = std::move(s->trailing_metadata_buffer); *s->recv_trailing_metadata = std::move(s->trailing_metadata_buffer);
s->recv_trailing_metadata->Set(grpc_core::PeerString(), t->peer_string);
null_then_sched_closure(&s->recv_trailing_metadata_finished); null_then_sched_closure(&s->recv_trailing_metadata_finished);
} }
} }

@ -209,7 +209,7 @@ class FilterStackCall final : public Call {
bool is_trailers_only() const override { bool is_trailers_only() const override {
bool result = is_trailers_only_; bool result = is_trailers_only_;
GPR_DEBUG_ASSERT(!result || recv_initial_metadata_.empty()); GPR_DEBUG_ASSERT(!result || recv_initial_metadata_.TransportSize() == 0);
return result; return result;
} }

@ -45,6 +45,8 @@
#include "src/core/lib/transport/parsed_metadata.h" #include "src/core/lib/transport/parsed_metadata.h"
#include "src/core/lib/transport/timeout_encoding.h" #include "src/core/lib/transport/timeout_encoding.h"
struct grpc_call_final_info;
namespace grpc_core { namespace grpc_core {
// grpc-timeout metadata trait. // grpc-timeout metadata trait.
@ -526,6 +528,14 @@ struct GrpcStreamNetworkState {
} }
}; };
// Annotation added by a server transport to note the peer making a request.
struct PeerString {
static absl::string_view DebugKey() { return "PeerString"; }
static constexpr bool kRepeatable = false;
using ValueType = absl::string_view;
static std::string DisplayValue(ValueType x) { return std::string(x); }
};
// Annotation added by various systems to describe the reason for a failure. // Annotation added by various systems to describe the reason for a failure.
struct GrpcStatusContext { struct GrpcStatusContext {
static absl::string_view DebugKey() { return "GrpcStatusContext"; } static absl::string_view DebugKey() { return "GrpcStatusContext"; }
@ -1374,7 +1384,8 @@ using grpc_metadata_batch_base = grpc_core::MetadataMap<
grpc_core::GrpcTagsBinMetadata, grpc_core::GrpcLbClientStatsMetadata, grpc_core::GrpcTagsBinMetadata, grpc_core::GrpcLbClientStatsMetadata,
grpc_core::LbCostBinMetadata, grpc_core::LbTokenMetadata, grpc_core::LbCostBinMetadata, grpc_core::LbTokenMetadata,
// Non-encodable things // Non-encodable things
grpc_core::GrpcStreamNetworkState, grpc_core::GrpcStatusContext>; grpc_core::GrpcStreamNetworkState, grpc_core::PeerString,
grpc_core::GrpcStatusContext>;
struct grpc_metadata_batch : public grpc_metadata_batch_base { struct grpc_metadata_batch : public grpc_metadata_batch_base {
using grpc_metadata_batch_base::grpc_metadata_batch_base; using grpc_metadata_batch_base::grpc_metadata_batch_base;

@ -145,7 +145,7 @@ TEST_F(ServerLoadReportingEnd2endTest, BasicReport) {
while (true) { while (true) {
stream->Read(&response); stream->Read(&response);
if (!response.load().empty()) { if (!response.load().empty()) {
ASSERT_EQ(response.load().size(), 3); ASSERT_EQ(response.load().size(), 3) << response.DebugString();
for (const auto& load : response.load()) { for (const auto& load : response.load()) {
if (load.in_progress_report_case()) { if (load.in_progress_report_case()) {
// The special load record that reports the number of in-progress // The special load record that reports the number of in-progress

Loading…
Cancel
Save