pull/37815/head
Craig Tiller 5 months ago
parent 3ea3e36e95
commit 606a6350fe
  1. 35
      src/core/lib/channel/promise_based_filter.cc
  2. 3
      src/core/lib/channel/promise_based_filter.h
  3. 6
      src/core/util/latent_see.h

@ -243,8 +243,8 @@ void BaseCallData::CapturedBatch::CancelWith(grpc_error_handle error,
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// BaseCallData::Flusher // BaseCallData::Flusher
BaseCallData::Flusher::Flusher(BaseCallData* call, const char* desc) BaseCallData::Flusher::Flusher(BaseCallData* call, latent_see::Metadata* desc)
: latent_see::InnerScope(GRPC_LATENT_SEE_METADATA_RAW(desc)), call_(call) { : latent_see::InnerScope(desc), call_(call) {
GRPC_CALL_STACK_REF(call_->call_stack(), "flusher"); GRPC_CALL_STACK_REF(call_->call_stack(), "flusher");
} }
@ -396,7 +396,7 @@ bool BaseCallData::SendMessage::IsIdle() const {
} }
void BaseCallData::SendMessage::OnComplete(absl::Status status) { void BaseCallData::SendMessage::OnComplete(absl::Status status) {
Flusher flusher(base_, "SendMessage::OnComplete"); Flusher flusher(base_, GRPC_LATENT_SEE_METADATA("SendMessage::OnComplete"));
GRPC_TRACE_LOG(channel, INFO) GRPC_TRACE_LOG(channel, INFO)
<< base_->LogTag() << " SendMessage.OnComplete st=" << StateString(state_) << base_->LogTag() << " SendMessage.OnComplete st=" << StateString(state_)
<< " status=" << status; << " status=" << status;
@ -706,7 +706,8 @@ void BaseCallData::ReceiveMessage::OnComplete(absl::Status status) {
break; break;
} }
completed_status_ = status; completed_status_ = status;
Flusher flusher(base_, "ReceiveMessage::OnComplete"); Flusher flusher(base_,
GRPC_LATENT_SEE_METADATA("ReceiveMessage::OnComplete"));
ScopedContext ctx(base_); ScopedContext ctx(base_);
base_->WakeInsideCombiner(&flusher); base_->WakeInsideCombiner(&flusher);
} }
@ -1221,7 +1222,8 @@ class ClientCallData::PollContext {
{ {
ScopedContext ctx(next_poll->call_data); ScopedContext ctx(next_poll->call_data);
Flusher flusher(next_poll->call_data, Flusher flusher(next_poll->call_data,
"ClientCallData::PollContext::~PollContext"); GRPC_LATENT_SEE_METADATA(
"ClientCallData::PollContext::~PollContext"));
next_poll->call_data->WakeInsideCombiner(&flusher); next_poll->call_data->WakeInsideCombiner(&flusher);
} }
GRPC_CALL_STACK_UNREF(next_poll->call_stack, "re-poll"); GRPC_CALL_STACK_UNREF(next_poll->call_stack, "re-poll");
@ -1350,7 +1352,7 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) {
// Fake out the activity based context. // Fake out the activity based context.
ScopedContext context(this); ScopedContext context(this);
CapturedBatch batch(b); CapturedBatch batch(b);
Flusher flusher(this, "ClientCallData::StartBatch"); Flusher flusher(this, GRPC_LATENT_SEE_METADATA("ClientCallData::StartBatch"));
GRPC_TRACE_LOG(channel, INFO) << LogTag() << " StartBatch " << DebugString(); GRPC_TRACE_LOG(channel, INFO) << LogTag() << " StartBatch " << DebugString();
@ -1556,7 +1558,8 @@ void ClientCallData::RecvInitialMetadataReady(grpc_error_handle error) {
<< DebugString() << " error:" << error.ToString() << DebugString() << " error:" << error.ToString()
<< " md:" << recv_initial_metadata_->metadata->DebugString(); << " md:" << recv_initial_metadata_->metadata->DebugString();
ScopedContext context(this); ScopedContext context(this);
Flusher flusher(this, "ClientCallData::RecvInitialMetadataReady"); Flusher flusher(this, GRPC_LATENT_SEE_METADATA(
"ClientCallData::RecvInitialMetadataReady"));
if (!error.ok()) { if (!error.ok()) {
switch (recv_initial_metadata_->state) { switch (recv_initial_metadata_->state) {
case RecvInitialMetadata::kHookedWaitingForPipe: case RecvInitialMetadata::kHookedWaitingForPipe:
@ -1742,7 +1745,8 @@ void ClientCallData::RecvTrailingMetadataReadyCallback(
} }
void ClientCallData::RecvTrailingMetadataReady(grpc_error_handle error) { void ClientCallData::RecvTrailingMetadataReady(grpc_error_handle error) {
Flusher flusher(this, "ClientCallData::RecvTrailingMetadataReady"); Flusher flusher(this, GRPC_LATENT_SEE_METADATA(
"ClientCallData::RecvTrailingMetadataReady"));
GRPC_TRACE_LOG(channel, INFO) GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << " ClientCallData.RecvTrailingMetadataReady " << LogTag() << " ClientCallData.RecvTrailingMetadataReady "
<< "recv_trailing_state=" << StateString(recv_trailing_state_) << "recv_trailing_state=" << StateString(recv_trailing_state_)
@ -1798,7 +1802,7 @@ void ClientCallData::WakeInsideCombiner(Flusher* flusher) {
} }
void ClientCallData::OnWakeup() { void ClientCallData::OnWakeup() {
Flusher flusher(this, "ClientCallData::OnWakeup"); Flusher flusher(this, GRPC_LATENT_SEE_METADATA("ClientCallData::OnWakeup"));
ScopedContext context(this); ScopedContext context(this);
WakeInsideCombiner(&flusher); WakeInsideCombiner(&flusher);
} }
@ -1875,7 +1879,8 @@ class ServerCallData::PollContext {
auto* next_poll = static_cast<NextPoll*>(p); auto* next_poll = static_cast<NextPoll*>(p);
{ {
Flusher flusher(next_poll->call_data, Flusher flusher(next_poll->call_data,
"ServerCallData::PollContext::~PollContext"); GRPC_LATENT_SEE_METADATA(
"ServerCallData::PollContext::~PollContext"));
ScopedContext context(next_poll->call_data); ScopedContext context(next_poll->call_data);
next_poll->call_data->WakeInsideCombiner(&flusher); next_poll->call_data->WakeInsideCombiner(&flusher);
} }
@ -1979,7 +1984,7 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
// Fake out the activity based context. // Fake out the activity based context.
ScopedContext context(this); ScopedContext context(this);
CapturedBatch batch(b); CapturedBatch batch(b);
Flusher flusher(this, "ServerCallData::StartBatch"); Flusher flusher(this, GRPC_LATENT_SEE_METADATA("ServerCallData::StartBatch"));
bool wake = false; bool wake = false;
GRPC_TRACE_LOG(channel, INFO) << LogTag() << " StartBatch: " << DebugString(); GRPC_TRACE_LOG(channel, INFO) << LogTag() << " StartBatch: " << DebugString();
@ -2268,7 +2273,8 @@ void ServerCallData::RecvTrailingMetadataReady(grpc_error_handle error) {
GRPC_TRACE_LOG(channel, INFO) GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << ": RecvTrailingMetadataReady error=" << error << LogTag() << ": RecvTrailingMetadataReady error=" << error
<< " md=" << recv_trailing_metadata_->DebugString(); << " md=" << recv_trailing_metadata_->DebugString();
Flusher flusher(this, "ServerCallData::RecvTrailingMetadataReady"); Flusher flusher(this, GRPC_LATENT_SEE_METADATA(
"ServerCallData::RecvTrailingMetadataReady"));
PollContext poll_ctx(this, &flusher); PollContext poll_ctx(this, &flusher);
Completed(error, recv_trailing_metadata_->get(GrpcTarPit()).has_value(), Completed(error, recv_trailing_metadata_->get(GrpcTarPit()).has_value(),
&flusher); &flusher);
@ -2282,7 +2288,8 @@ void ServerCallData::RecvInitialMetadataReadyCallback(void* arg,
} }
void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) { void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) {
Flusher flusher(this, "ServerCallData::RecvInitialMetadataReady"); Flusher flusher(this, GRPC_LATENT_SEE_METADATA(
"ServerCallData::RecvInitialMetadataReady"));
GRPC_TRACE_LOG(channel, INFO) GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << ": RecvInitialMetadataReady " << error; << LogTag() << ": RecvInitialMetadataReady " << error;
CHECK(recv_initial_state_ == RecvInitialState::kForwarded); CHECK(recv_initial_state_ == RecvInitialState::kForwarded);
@ -2497,7 +2504,7 @@ void ServerCallData::WakeInsideCombiner(Flusher* flusher) {
} }
void ServerCallData::OnWakeup() { void ServerCallData::OnWakeup() {
Flusher flusher(this, "ServerCallData::OnWakeup"); Flusher flusher(this, GRPC_LATENT_SEE_METADATA("ServerCallData::OnWakeup"));
ScopedContext context(this); ScopedContext context(this);
WakeInsideCombiner(&flusher); WakeInsideCombiner(&flusher);
} }

@ -977,7 +977,8 @@ class BaseCallData : public Activity, private Wakeable {
class Flusher : public latent_see::InnerScope { class Flusher : public latent_see::InnerScope {
public: public:
explicit Flusher(BaseCallData* call, explicit Flusher(BaseCallData* call,
const char* desc = "PromiseBasedFilter::Flusher"); latent_see::Metadata* desc = GRPC_LATENT_SEE_METADATA(
"PromiseBasedFilter::Flusher"));
// Calls closures, schedules batches, relinquishes call combiner. // Calls closures, schedules batches, relinquishes call combiner.
~Flusher(); ~Flusher();

@ -260,12 +260,6 @@ GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void Mark(const Metadata* md) {
#name}; \ #name}; \
return &metadata; \ return &metadata; \
}() }()
#define GRPC_LATENT_SEE_METADATA_RAW(name) \
[ptr = name]() { \
static grpc_core::latent_see::Metadata metadata = {__FILE__, __LINE__, \
ptr}; \
return &metadata; \
}()
// Parent scope: logs a begin and end event, and flushes the thread log on scope // Parent scope: logs a begin and end event, and flushes the thread log on scope
// exit. Because the flush takes some time it's better to place one parent scope // exit. Because the flush takes some time it's better to place one parent scope
// at the top of the stack, and use lighter weight scopes within it. // at the top of the stack, and use lighter weight scopes within it.

Loading…
Cancel
Save