[tracing] Add latent see annotations to the filter stack.

This would allow us to better attribute time spent in different filters.

PiperOrigin-RevId: 679739129
pull/37584/head
Vignesh Babu 4 months ago committed by Copybara-Service
parent 1b11b92182
commit 05d214ee86
  1. 1
      BUILD
  2. 12
      src/core/BUILD
  3. 3
      src/core/ext/filters/backend_metrics/backend_metric_filter.cc
  4. 7
      src/core/ext/filters/http/client/http_client_filter.cc
  5. 3
      src/core/ext/filters/http/client_authority_filter.cc
  6. 17
      src/core/ext/filters/http/message_compress/compression_filter.cc
  7. 7
      src/core/ext/filters/http/server/http_server_filter.cc
  8. 6
      src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
  9. 25
      src/core/ext/filters/logging/logging_filter.cc
  10. 9
      src/core/ext/filters/message_size/message_size_filter.cc
  11. 2
      src/core/ext/filters/rbac/rbac_filter.cc
  12. 7
      src/core/ext/filters/stateful_session/stateful_session_filter.cc
  13. 35
      src/core/lib/channel/promise_based_filter.cc
  14. 3
      src/core/lib/channel/promise_based_filter.h
  15. 3
      src/core/lib/security/authorization/grpc_server_authz_filter.cc
  16. 7
      src/core/load_balancing/grpclb/client_load_reporting_filter.cc
  17. 8
      src/core/server/server_call_tracer_filter.cc
  18. 3
      src/core/server/server_config_selector_filter.cc
  19. 3
      src/core/service_config/service_config_channel_arg_filter.cc
  20. 22
      src/core/util/latent_see.cc
  21. 7
      src/core/util/latent_see.h

@ -4362,6 +4362,7 @@ grpc_cc_library(
"//src/core:experiments",
"//src/core:grpc_message_size_filter",
"//src/core:latch",
"//src/core:latent_see",
"//src/core:map",
"//src/core:metadata_batch",
"//src/core:percent_encoding",

@ -199,6 +199,7 @@ grpc_cc_library(
"channel_fwd",
"channel_stack_type",
"context",
"latent_see",
"map",
"pipe",
"//:call_tracer",
@ -3539,6 +3540,7 @@ grpc_cc_library(
"context",
"grpc_message_size_filter",
"grpc_service_config",
"latent_see",
"metadata_batch",
"service_config_parser",
"//:channel_arg_names",
@ -3814,6 +3816,7 @@ grpc_cc_library(
"event_engine_context",
"grpc_server_config_selector",
"grpc_service_config",
"latent_see",
"metadata_batch",
"status_helper",
"//:gpr",
@ -3941,6 +3944,7 @@ grpc_cc_library(
"channel_fwd",
"dual_ref_counted",
"endpoint_info_handshaker",
"latent_see",
"load_file",
"metadata_batch",
"ref_counted",
@ -4853,6 +4857,7 @@ grpc_cc_library(
"channel_args",
"channel_fwd",
"channel_stack_type",
"latent_see",
"metadata_batch",
"slice",
"//:channel_arg_names",
@ -4892,6 +4897,7 @@ grpc_cc_library(
"json_args",
"json_object_loader",
"latch",
"latent_see",
"metadata_batch",
"race",
"service_config_parser",
@ -4982,6 +4988,7 @@ grpc_cc_library(
"json",
"json_args",
"json_object_loader",
"latent_see",
"metadata_batch",
"service_config_parser",
"validation_errors",
@ -5021,6 +5028,7 @@ grpc_cc_library(
"json",
"json_args",
"json_object_loader",
"latent_see",
"map",
"metadata_batch",
"pipe",
@ -5128,6 +5136,7 @@ grpc_cc_library(
"json",
"json_args",
"json_object_loader",
"latent_see",
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
@ -6893,6 +6902,7 @@ grpc_cc_library(
"channel_stack_type",
"context",
"grpc_sockaddr",
"latent_see",
"metadata_batch",
"resolved_address",
"seq",
@ -6937,6 +6947,7 @@ grpc_cc_library(
"experiments",
"grpc_backend_metric_data",
"grpc_backend_metric_provider",
"latent_see",
"map",
"metadata_batch",
"slice",
@ -7880,6 +7891,7 @@ grpc_cc_library(
"channel_fwd",
"channel_stack_type",
"context",
"latent_see",
"logging_sink",
"map",
"metadata_batch",

@ -43,6 +43,7 @@
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/load_balancing/backend_metric_data.h"
#include "src/core/util/latent_see.h"
namespace grpc_core {
@ -125,6 +126,8 @@ BackendMetricFilter::Create(const ChannelArgs&, ChannelFilter::Args) {
}
void BackendMetricFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"BackendMetricFilter::Call::OnServerTrailingMetadata");
if (md.get(GrpcCallWasCancelled()).value_or(false)) return;
auto* ctx = MaybeGetContext<BackendMetricProvider>();
if (ctx == nullptr) {

@ -49,6 +49,7 @@
#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/util/latent_see.h"
namespace grpc_core {
@ -113,6 +114,8 @@ Slice UserAgentFromArgs(const ChannelArgs& args,
void HttpClientFilter::Call::OnClientInitialMetadata(ClientMetadata& md,
HttpClientFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"HttpClientFilter::Call::OnClientInitialMetadata");
if (filter->test_only_use_put_requests_) {
md.Set(HttpMethodMetadata(), HttpMethodMetadata::kPut);
} else {
@ -126,11 +129,15 @@ void HttpClientFilter::Call::OnClientInitialMetadata(ClientMetadata& md,
absl::Status HttpClientFilter::Call::OnServerInitialMetadata(
ServerMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"HttpClientFilter::Call::OnServerInitialMetadata");
return CheckServerMetadata(&md);
}
absl::Status HttpClientFilter::Call::OnServerTrailingMetadata(
ServerMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"HttpClientFilter::Call::OnServerTrailingMetadata");
return CheckServerMetadata(&md);
}

@ -34,6 +34,7 @@
#include "src/core/lib/security/transport/auth_filters.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/util/latent_see.h"
namespace grpc_core {
@ -59,6 +60,8 @@ ClientAuthorityFilter::Create(const ChannelArgs& args, ChannelFilter::Args) {
void ClientAuthorityFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ClientAuthorityFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientAuthorityFilter::Call::OnClientInitialMetadata");
// If no authority is set, set the default authority.
if (md.get_pointer(HttpAuthorityMetadata()) == nullptr) {
md.Set(HttpAuthorityMetadata(), filter->default_authority_.Ref());

@ -51,6 +51,7 @@
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/telemetry/call_tracer.h"
#include "src/core/util/latent_see.h"
namespace grpc_core {
@ -238,48 +239,64 @@ ChannelCompression::DecompressArgs ChannelCompression::HandleIncomingMetadata(
void ClientCompressionFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ClientCompressionFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientCompressionFilter::Call::OnClientInitialMetadata");
compression_algorithm_ =
filter->compression_engine_.HandleOutgoingMetadata(md);
}
MessageHandle ClientCompressionFilter::Call::OnClientToServerMessage(
MessageHandle message, ClientCompressionFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientCompressionFilter::Call::OnClientToServerMessage");
return filter->compression_engine_.CompressMessage(std::move(message),
compression_algorithm_);
}
void ClientCompressionFilter::Call::OnServerInitialMetadata(
ServerMetadata& md, ClientCompressionFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientCompressionFilter::Call::OnServerInitialMetadata");
decompress_args_ = filter->compression_engine_.HandleIncomingMetadata(md);
}
absl::StatusOr<MessageHandle>
ClientCompressionFilter::Call::OnServerToClientMessage(
MessageHandle message, ClientCompressionFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientCompressionFilter::Call::OnServerToClientMessage");
return filter->compression_engine_.DecompressMessage(
/*is_client=*/true, std::move(message), decompress_args_);
}
void ServerCompressionFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ServerCompressionFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerCompressionFilter::Call::OnClientInitialMetadata");
decompress_args_ = filter->compression_engine_.HandleIncomingMetadata(md);
}
absl::StatusOr<MessageHandle>
ServerCompressionFilter::Call::OnClientToServerMessage(
MessageHandle message, ServerCompressionFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerCompressionFilter::Call::OnClientToServerMessage");
return filter->compression_engine_.DecompressMessage(
/*is_client=*/false, std::move(message), decompress_args_);
}
void ServerCompressionFilter::Call::OnServerInitialMetadata(
ServerMetadata& md, ServerCompressionFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerCompressionFilter::Call::OnServerInitialMetadata");
compression_algorithm_ =
filter->compression_engine_.HandleOutgoingMetadata(md);
}
MessageHandle ServerCompressionFilter::Call::OnServerToClientMessage(
MessageHandle message, ServerCompressionFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerCompressionFilter::Call::OnServerToClientMessage");
return filter->compression_engine_.CompressMessage(std::move(message),
compression_algorithm_);
}

@ -45,6 +45,7 @@
#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/util/latent_see.h"
namespace grpc_core {
@ -77,6 +78,8 @@ ServerMetadataHandle MalformedRequest(absl::string_view explanation) {
ServerMetadataHandle HttpServerFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, HttpServerFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"HttpServerFilter::Call::OnClientInitialMetadata");
auto method = md.get(HttpMethodMetadata());
if (method.has_value()) {
switch (*method) {
@ -139,6 +142,8 @@ ServerMetadataHandle HttpServerFilter::Call::OnClientInitialMetadata(
}
void HttpServerFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"HttpServerFilter::Call::OnServerInitialMetadata");
GRPC_TRACE_LOG(call, INFO)
<< GetContext<Activity>()->DebugTag() << "[http-server] Write metadata";
FilterOutgoingMetadata(&md);
@ -147,6 +152,8 @@ void HttpServerFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
}
void HttpServerFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"HttpServerFilter::Call::OnServerTrailingMetadata");
FilterOutgoingMetadata(&md);
}

@ -61,6 +61,7 @@
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/util/latent_see.h"
#include "src/core/util/uri.h"
#include "src/cpp/server/load_reporter/constants.h"
@ -181,6 +182,8 @@ const char* GetStatusTagForStatus(grpc_status_code status) {
void ServerLoadReportingFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ServerLoadReportingFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerLoadReportingFilter::Call::OnClientInitialMetadata");
// Gather up basic facts about the request
Slice service_method;
if (const Slice* path = md.get_pointer(HttpPathMetadata())) {
@ -205,6 +208,8 @@ void ServerLoadReportingFilter::Call::OnClientInitialMetadata(
void ServerLoadReportingFilter::Call::OnServerTrailingMetadata(
ServerMetadata& md, ServerLoadReportingFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerLoadReportingFilter::Call::OnServerTrailingMetadata");
const auto& costs = md.Take(LbCostBinMetadata());
for (const auto& cost : costs) {
opencensus::stats::Record(
@ -222,6 +227,7 @@ void ServerLoadReportingFilter::Call::OnServerTrailingMetadata(
void ServerLoadReportingFilter::Call::OnFinalize(
const grpc_call_final_info* final_info, ServerLoadReportingFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE("ServerLoadReportingFilter::Call::OnFinalize");
if (final_info == nullptr) return;
// After the last bytes have been placed on the wire we record
// final measurements

@ -67,6 +67,7 @@
#include "src/core/resolver/resolver_registry.h"
#include "src/core/telemetry/call_tracer.h"
#include "src/core/util/host_port.h"
#include "src/core/util/latent_see.h"
#include "src/core/util/time.h"
#include "src/core/util/uri.h"
@ -359,6 +360,8 @@ ClientLoggingFilter::Create(const ChannelArgs& args,
void ClientLoggingFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ClientLoggingFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientLoggingFilter::Call::OnClientInitialMetadata");
call_data_.emplace(true, md, filter->default_authority_);
if (!call_data_->ShouldLog()) {
call_data_.reset();
@ -369,6 +372,8 @@ void ClientLoggingFilter::Call::OnClientInitialMetadata(
}
void ClientLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientLoggingFilter::Call::OnServerInitialMetadata");
if (!call_data_.has_value()) return;
call_data_->LogServerHeader(
/*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(),
@ -376,6 +381,8 @@ void ClientLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
}
void ClientLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientLoggingFilter::Call::OnServerTrailingMetadata");
if (!call_data_.has_value()) return;
if (md.get(GrpcCallWasCancelled()).value_or(false) &&
md.get(GrpcStatusMetadata()) == GRPC_STATUS_CANCELLED) {
@ -390,6 +397,8 @@ void ClientLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
void ClientLoggingFilter::Call::OnClientToServerMessage(
const Message& message) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientLoggingFilter::Call::OnClientToServerMessage");
if (!call_data_.has_value()) return;
call_data_->LogClientMessage(
/*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(),
@ -397,6 +406,8 @@ void ClientLoggingFilter::Call::OnClientToServerMessage(
}
void ClientLoggingFilter::Call::OnClientToServerHalfClose() {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientLoggingFilter::Call::OnClientToServerHalfClose");
if (!call_data_.has_value()) return;
call_data_->LogClientHalfClose(
/*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>());
@ -404,6 +415,8 @@ void ClientLoggingFilter::Call::OnClientToServerHalfClose() {
void ClientLoggingFilter::Call::OnServerToClientMessage(
const Message& message) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientLoggingFilter::Call::OnServerToClientMessage");
if (!call_data_.has_value()) return;
call_data_->LogServerMessage(
/*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(),
@ -424,6 +437,8 @@ ServerLoggingFilter::Create(const ChannelArgs& /*args*/,
// Construct a promise for one call.
void ServerLoggingFilter::Call::OnClientInitialMetadata(ClientMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerLoggingFilter::Call::OnClientInitialMetadata");
call_data_.emplace(false, md, "");
if (!call_data_->ShouldLog()) {
call_data_.reset();
@ -435,6 +450,8 @@ void ServerLoggingFilter::Call::OnClientInitialMetadata(ClientMetadata& md) {
}
void ServerLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerLoggingFilter::Call::OnServerInitialMetadata");
if (!call_data_.has_value()) return;
call_data_->LogServerHeader(
/*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
@ -442,6 +459,8 @@ void ServerLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
}
void ServerLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerLoggingFilter::Call::OnServerTrailingMetadata");
if (!call_data_.has_value()) return;
if (md.get(GrpcCallWasCancelled()).value_or(false) &&
md.get(GrpcStatusMetadata()) == GRPC_STATUS_CANCELLED) {
@ -456,6 +475,8 @@ void ServerLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
void ServerLoggingFilter::Call::OnClientToServerMessage(
const Message& message) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerLoggingFilter::Call::OnClientToServerMessage");
if (!call_data_.has_value()) return;
call_data_->LogClientMessage(
/*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
@ -463,6 +484,8 @@ void ServerLoggingFilter::Call::OnClientToServerMessage(
}
void ServerLoggingFilter::Call::OnClientToServerHalfClose() {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerLoggingFilter::Call::OnClientToServerHalfClose");
if (!call_data_.has_value()) return;
call_data_->LogClientHalfClose(
/*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>());
@ -470,6 +493,8 @@ void ServerLoggingFilter::Call::OnClientToServerHalfClose() {
void ServerLoggingFilter::Call::OnServerToClientMessage(
const Message& message) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerLoggingFilter::Call::OnServerToClientMessage");
if (!call_data_.has_value()) return;
call_data_->LogServerMessage(
/*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),

@ -44,6 +44,7 @@
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/service_config/service_config_call_data.h"
#include "src/core/util/latent_see.h"
namespace grpc_core {
@ -201,24 +202,32 @@ ClientMessageSizeFilter::Call::Call(ClientMessageSizeFilter* filter)
ServerMetadataHandle ServerMessageSizeFilter::Call::OnClientToServerMessage(
const Message& message, ServerMessageSizeFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerMessageSizeFilter::Call::OnClientToServerMessage");
return CheckPayload(message, filter->parsed_config_.max_recv_size(),
/*is_client=*/false, false);
}
ServerMetadataHandle ServerMessageSizeFilter::Call::OnServerToClientMessage(
const Message& message, ServerMessageSizeFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerMessageSizeFilter::Call::OnServerToClientMessage");
return CheckPayload(message, filter->parsed_config_.max_send_size(),
/*is_client=*/false, true);
}
ServerMetadataHandle ClientMessageSizeFilter::Call::OnClientToServerMessage(
const Message& message) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientMessageSizeFilter::Call::OnClientToServerMessage");
return CheckPayload(message, limits_.max_send_size(), /*is_client=*/true,
true);
}
ServerMetadataHandle ClientMessageSizeFilter::Call::OnServerToClientMessage(
const Message& message) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientMessageSizeFilter::Call::OnServerToClientMessage");
return CheckPayload(message, limits_.max_recv_size(), /*is_client=*/true,
false);
}

@ -39,6 +39,7 @@
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/service_config/service_config_call_data.h"
#include "src/core/util/latent_see.h"
namespace grpc_core {
@ -51,6 +52,7 @@ const NoInterceptor RbacFilter::Call::OnFinalize;
absl::Status RbacFilter::Call::OnClientInitialMetadata(ClientMetadata& md,
RbacFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE("RbacFilter::Call::OnClientInitialMetadata");
// Fetch and apply the rbac policy from the service config.
auto* service_config_call_data = GetContext<ServiceConfigCallData>();
auto* method_params = static_cast<RbacMethodParsedConfig*>(

@ -51,6 +51,7 @@
#include "src/core/resolver/xds/xds_resolver_attributes.h"
#include "src/core/service_config/service_config_call_data.h"
#include "src/core/util/crash.h"
#include "src/core/util/latent_see.h"
#include "src/core/util/time.h"
namespace grpc_core {
@ -219,6 +220,8 @@ bool IsConfiguredPath(absl::string_view configured_path,
void StatefulSessionFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, StatefulSessionFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"StatefulSessionFilter::Call::OnClientInitialMetadata");
// Get config.
auto* service_config_call_data = GetContext<ServiceConfigCallData>();
CHECK_NE(service_config_call_data, nullptr);
@ -258,6 +261,8 @@ void StatefulSessionFilter::Call::OnClientInitialMetadata(
}
void StatefulSessionFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"StatefulSessionFilter::Call::OnServerInitialMetadata");
if (!perform_filtering_) return;
// Add cookie to server initial metadata if needed.
MaybeUpdateServerInitialMetadata(cookie_config_, cluster_changed_,
@ -266,6 +271,8 @@ void StatefulSessionFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
}
void StatefulSessionFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
GRPC_LATENT_SEE_INNER_SCOPE(
"StatefulSessionFilter::Call::OnServerTrailingMetadata");
if (!perform_filtering_) return;
// If we got a Trailers-Only response, then add the
// cookie to the trailing metadata instead of the

@ -38,6 +38,7 @@
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/util/crash.h"
#include "src/core/util/latent_see.h"
#include "src/core/util/manual_constructor.h"
#include "src/core/util/status_helper.h"
@ -242,10 +243,8 @@ void BaseCallData::CapturedBatch::CancelWith(grpc_error_handle error,
///////////////////////////////////////////////////////////////////////////////
// BaseCallData::Flusher
BaseCallData::Flusher::Flusher(BaseCallData* call)
: latent_see::InnerScope(
GRPC_LATENT_SEE_METADATA("PromiseBasedFilter Flusher")),
call_(call) {
BaseCallData::Flusher::Flusher(BaseCallData* call, const char* desc)
: latent_see::InnerScope(GRPC_LATENT_SEE_METADATA_RAW(desc)), call_(call) {
GRPC_CALL_STACK_REF(call_->call_stack(), "flusher");
}
@ -397,7 +396,7 @@ bool BaseCallData::SendMessage::IsIdle() const {
}
void BaseCallData::SendMessage::OnComplete(absl::Status status) {
Flusher flusher(base_);
Flusher flusher(base_, "SendMessage::OnComplete");
GRPC_TRACE_LOG(channel, INFO)
<< base_->LogTag() << " SendMessage.OnComplete st=" << StateString(state_)
<< " status=" << status;
@ -707,7 +706,7 @@ void BaseCallData::ReceiveMessage::OnComplete(absl::Status status) {
break;
}
completed_status_ = status;
Flusher flusher(base_);
Flusher flusher(base_, "ReceiveMessage::OnComplete");
ScopedContext ctx(base_);
base_->WakeInsideCombiner(&flusher);
}
@ -1221,7 +1220,8 @@ class ClientCallData::PollContext {
auto* next_poll = static_cast<NextPoll*>(p);
{
ScopedContext ctx(next_poll->call_data);
Flusher flusher(next_poll->call_data);
Flusher flusher(next_poll->call_data,
"ClientCallData::PollContext::~PollContext");
next_poll->call_data->WakeInsideCombiner(&flusher);
}
GRPC_CALL_STACK_UNREF(next_poll->call_stack, "re-poll");
@ -1350,7 +1350,7 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) {
// Fake out the activity based context.
ScopedContext context(this);
CapturedBatch batch(b);
Flusher flusher(this);
Flusher flusher(this, "ClientCallData::StartBatch");
GRPC_TRACE_LOG(channel, INFO) << LogTag() << " StartBatch " << DebugString();
@ -1556,7 +1556,7 @@ void ClientCallData::RecvInitialMetadataReady(grpc_error_handle error) {
<< DebugString() << " error:" << error.ToString()
<< " md:" << recv_initial_metadata_->metadata->DebugString();
ScopedContext context(this);
Flusher flusher(this);
Flusher flusher(this, "ClientCallData::RecvInitialMetadataReady");
if (!error.ok()) {
switch (recv_initial_metadata_->state) {
case RecvInitialMetadata::kHookedWaitingForPipe:
@ -1742,7 +1742,7 @@ void ClientCallData::RecvTrailingMetadataReadyCallback(
}
void ClientCallData::RecvTrailingMetadataReady(grpc_error_handle error) {
Flusher flusher(this);
Flusher flusher(this, "ClientCallData::RecvTrailingMetadataReady");
GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << " ClientCallData.RecvTrailingMetadataReady "
<< "recv_trailing_state=" << StateString(recv_trailing_state_)
@ -1793,11 +1793,12 @@ void ClientCallData::SetStatusFromError(grpc_metadata_batch* metadata,
// Wakeup and poll the promise if appropriate.
void ClientCallData::WakeInsideCombiner(Flusher* flusher) {
GRPC_LATENT_SEE_INNER_SCOPE("ClientCallData::WakeInsideCombiner");
PollContext(this, flusher).Run();
}
void ClientCallData::OnWakeup() {
Flusher flusher(this);
Flusher flusher(this, "ClientCallData::OnWakeup");
ScopedContext context(this);
WakeInsideCombiner(&flusher);
}
@ -1873,7 +1874,8 @@ class ServerCallData::PollContext {
auto run = [](void* p, grpc_error_handle) {
auto* next_poll = static_cast<NextPoll*>(p);
{
Flusher flusher(next_poll->call_data);
Flusher flusher(next_poll->call_data,
"ServerCallData::PollContext::~PollContext");
ScopedContext context(next_poll->call_data);
next_poll->call_data->WakeInsideCombiner(&flusher);
}
@ -1977,7 +1979,7 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
// Fake out the activity based context.
ScopedContext context(this);
CapturedBatch batch(b);
Flusher flusher(this);
Flusher flusher(this, "ServerCallData::StartBatch");
bool wake = false;
GRPC_TRACE_LOG(channel, INFO) << LogTag() << " StartBatch: " << DebugString();
@ -2266,7 +2268,7 @@ void ServerCallData::RecvTrailingMetadataReady(grpc_error_handle error) {
GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << ": RecvTrailingMetadataReady error=" << error
<< " md=" << recv_trailing_metadata_->DebugString();
Flusher flusher(this);
Flusher flusher(this, "ServerCallData::RecvTrailingMetadataReady");
PollContext poll_ctx(this, &flusher);
Completed(error, recv_trailing_metadata_->get(GrpcTarPit()).has_value(),
&flusher);
@ -2280,7 +2282,7 @@ void ServerCallData::RecvInitialMetadataReadyCallback(void* arg,
}
void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) {
Flusher flusher(this);
Flusher flusher(this, "ServerCallData::RecvInitialMetadataReady");
GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << ": RecvInitialMetadataReady " << error;
CHECK(recv_initial_state_ == RecvInitialState::kForwarded);
@ -2343,6 +2345,7 @@ std::string ServerCallData::DebugString() const {
// Wakeup and poll the promise if appropriate.
void ServerCallData::WakeInsideCombiner(Flusher* flusher) {
GRPC_LATENT_SEE_INNER_SCOPE("ServerCallData::WakeInsideCombiner");
PollContext poll_ctx(this, flusher);
GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << ": WakeInsideCombiner " << DebugString();
@ -2494,7 +2497,7 @@ void ServerCallData::WakeInsideCombiner(Flusher* flusher) {
}
void ServerCallData::OnWakeup() {
Flusher flusher(this);
Flusher flusher(this, "ServerCallData::OnWakeup");
ScopedContext context(this);
WakeInsideCombiner(&flusher);
}

@ -976,7 +976,8 @@ class BaseCallData : public Activity, private Wakeable {
class Flusher : public latent_see::InnerScope {
public:
explicit Flusher(BaseCallData* call);
explicit Flusher(BaseCallData* call,
const char* desc = "PromiseBasedFilter::Flusher");
// Calls closures, schedules batches, relinquishes call combiner.
~Flusher();

@ -33,6 +33,7 @@
#include "src/core/lib/security/authorization/evaluate_args.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/util/latent_see.h"
namespace grpc_core {
@ -99,6 +100,8 @@ bool GrpcServerAuthzFilter::IsAuthorized(ClientMetadata& initial_metadata) {
absl::Status GrpcServerAuthzFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, GrpcServerAuthzFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"GrpcServerAuthzFilter::Call::OnClientInitialMetadata");
if (!filter->IsAuthorized(md)) {
return absl::PermissionDeniedError("Unauthorized RPC request rejected.");
}

@ -35,6 +35,7 @@
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/load_balancing/grpclb/grpclb_client_stats.h"
#include "src/core/util/latent_see.h"
#include "src/core/util/ref_counted_ptr.h"
namespace grpc_core {
@ -55,6 +56,8 @@ ClientLoadReportingFilter::Create(const ChannelArgs&, ChannelFilter::Args) {
void ClientLoadReportingFilter::Call::OnClientInitialMetadata(
ClientMetadata& client_initial_metadata) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientLoadReportingFilter::Call::OnClientInitialMetadata");
// Handle client initial metadata.
// Grab client stats object from metadata.
auto client_stats_md =
@ -65,11 +68,15 @@ void ClientLoadReportingFilter::Call::OnClientInitialMetadata(
}
void ClientLoadReportingFilter::Call::OnServerInitialMetadata(ServerMetadata&) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientLoadReportingFilter::Call::OnServerInitialMetadata");
saw_initial_metadata_ = true;
}
void ClientLoadReportingFilter::Call::OnServerTrailingMetadata(
ServerMetadata& server_trailing_metadata) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientLoadReportingFilter::Call::OnServerTrailingMetadata");
if (client_stats_ != nullptr) {
client_stats_->AddCallFinished(
server_trailing_metadata.get(GrpcStreamNetworkState()) ==

@ -39,6 +39,7 @@
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/telemetry/call_tracer.h"
#include "src/core/util/latent_see.h"
namespace grpc_core {
@ -57,24 +58,31 @@ class ServerCallTracerFilter
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& client_initial_metadata) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerCallTracerFilter::Call::OnClientInitialMetadata");
auto* call_tracer = MaybeGetContext<ServerCallTracer>();
if (call_tracer == nullptr) return;
call_tracer->RecordReceivedInitialMetadata(&client_initial_metadata);
}
void OnServerInitialMetadata(ServerMetadata& server_initial_metadata) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerCallTracerFilter::Call::OnServerInitialMetadata");
auto* call_tracer = MaybeGetContext<ServerCallTracer>();
if (call_tracer == nullptr) return;
call_tracer->RecordSendInitialMetadata(&server_initial_metadata);
}
void OnFinalize(const grpc_call_final_info* final_info) {
GRPC_LATENT_SEE_INNER_SCOPE("ServerCallTracerFilter::Call::OnFinalize");
auto* call_tracer = MaybeGetContext<ServerCallTracer>();
if (call_tracer == nullptr) return;
call_tracer->RecordEnd(final_info);
}
void OnServerTrailingMetadata(ServerMetadata& server_trailing_metadata) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerCallTracerFilter::Call::OnServerTrailingMetadata");
auto* call_tracer = MaybeGetContext<ServerCallTracer>();
if (call_tracer == nullptr) return;
call_tracer->RecordSendTrailingMetadata(&server_trailing_metadata);

@ -38,6 +38,7 @@
#include "src/core/server/server_config_selector.h"
#include "src/core/service_config/service_config.h"
#include "src/core/service_config/service_config_call_data.h"
#include "src/core/util/latent_see.h"
#include "src/core/util/ref_counted_ptr.h"
#include "src/core/util/status_helper.h"
#include "src/core/util/sync.h"
@ -144,6 +145,8 @@ void ServerConfigSelectorFilter::Orphan() {
absl::Status ServerConfigSelectorFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ServerConfigSelectorFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerConfigSelectorFilter::Call::OnClientInitialMetadata");
auto sel = filter->config_selector();
if (!sel.ok()) return sel.status();
auto call_config = sel.value()->GetCallConfig(&md);

@ -46,6 +46,7 @@
#include "src/core/service_config/service_config_call_data.h"
#include "src/core/service_config/service_config_impl.h"
#include "src/core/service_config/service_config_parser.h"
#include "src/core/util/latent_see.h"
#include "src/core/util/ref_counted_ptr.h"
namespace grpc_core {
@ -107,6 +108,8 @@ const NoInterceptor ServiceConfigChannelArgFilter::Call::OnFinalize;
void ServiceConfigChannelArgFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ServiceConfigChannelArgFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServiceConfigChannelArgFilter::Call::OnClientInitialMetadata");
const ServiceConfigParser::ParsedConfigVector* method_configs = nullptr;
if (filter->service_config_ != nullptr) {
method_configs = filter->service_config_->GetMethodParsedConfigVector(

@ -91,12 +91,22 @@ std::string Log::GenerateJson() {
}
if (!first) absl::StrAppend(&json, ",\n");
first = false;
absl::StrAppend(&json, "{\"name\": ", event.event.metadata->name,
", \"ph\": \"", phase, "\", \"ts\": ",
std::chrono::duration<double, std::micro>(
event.event.timestamp - *start_time)
.count(),
", \"pid\": 0, \"tid\": ", event.thread_id);
if (event.event.metadata->name[0] != '"') {
absl::StrAppend(&json, "{\"name\": \"", event.event.metadata->name,
"\", \"ph\": \"", phase, "\", \"ts\": ",
std::chrono::duration<unsigned long long, std::nano>(
event.event.timestamp - *start_time)
.count(),
", \"pid\": 0, \"tid\": ", event.thread_id);
} else {
absl::StrAppend(&json, "{\"name\": ", event.event.metadata->name,
", \"ph\": \"", phase, "\", \"ts\": ",
std::chrono::duration<unsigned long long, std::nano>(
event.event.timestamp - *start_time)
.count(),
", \"pid\": 0, \"tid\": ", event.thread_id);
}
if (has_id) {
absl::StrAppend(&json, ", \"id\": ", event.event.id);
}

@ -260,6 +260,12 @@ GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void Mark(const Metadata* md) {
#name}; \
return &metadata; \
}()
#define GRPC_LATENT_SEE_METADATA_RAW(name) \
[ptr = name]() { \
static grpc_core::latent_see::Metadata metadata = {__FILE__, __LINE__, \
ptr}; \
return &metadata; \
}()
// Parent scope: logs a begin and end event, and flushes the thread log on scope
// exit. Because the flush takes some time it's better to place one parent scope
// at the top of the stack, and use lighter weight scopes within it.
@ -295,6 +301,7 @@ struct InnerScope {
} // namespace latent_see
} // namespace grpc_core
#define GRPC_LATENT_SEE_METADATA(name) nullptr
#define GRPC_LATENT_SEE_METADATA_RAW(name) nullptr
#define GRPC_LATENT_SEE_PARENT_SCOPE(name) \
do { \
} while (0)

Loading…
Cancel
Save