[chaotic-good] Annotate key promises for latent-see (#37830)

Closes #37830

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37830 from ctiller:promising-latent-see 4f7dd1b7c6
PiperOrigin-RevId: 681228010
pull/37683/head
Craig Tiller 5 months ago committed by Copybara-Service
parent cd129b49c2
commit d6fd0bd991
  1. 30
      src/core/ext/transport/chaotic_good/client_transport.cc
  2. 28
      src/core/ext/transport/chaotic_good/server_transport.cc
  3. 2
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  4. 13
      src/core/lib/transport/promise_endpoint.h
  5. 20
      src/core/util/latent_see.h

@ -117,7 +117,9 @@ auto ChaoticGoodClientTransport::PushFrameIntoCall(ServerFragmentFrame frame,
}); });
// Wrap the actual sequence with something that owns the call handler so that // Wrap the actual sequence with something that owns the call handler so that
// its lifetime extends until the push completes. // its lifetime extends until the push completes.
return [call_handler, push = std::move(push)]() mutable { return push(); }; return GRPC_LATENT_SEE_PROMISE(
"PushFrameIntoCall",
([call_handler, push = std::move(push)]() mutable { return push(); }));
} }
auto ChaoticGoodClientTransport::TransportReadLoop( auto ChaoticGoodClientTransport::TransportReadLoop(
@ -205,10 +207,14 @@ ChaoticGoodClientTransport::ChaoticGoodClientTransport(
party_arena->SetContext<grpc_event_engine::experimental::EventEngine>( party_arena->SetContext<grpc_event_engine::experimental::EventEngine>(
event_engine.get()); event_engine.get());
party_ = Party::Make(std::move(party_arena)); party_ = Party::Make(std::move(party_arena));
party_->Spawn("client-chaotic-writer", TransportWriteLoop(transport), party_->Spawn("client-chaotic-writer",
GRPC_LATENT_SEE_PROMISE("ClientTransportWriteLoop",
TransportWriteLoop(transport)),
OnTransportActivityDone("write_loop")); OnTransportActivityDone("write_loop"));
party_->Spawn("client-chaotic-reader", party_->Spawn(
TransportReadLoop(std::move(transport)), "client-chaotic-reader",
GRPC_LATENT_SEE_PROMISE("ClientTransportReadLoop",
TransportReadLoop(std::move(transport))),
OnTransportActivityDone("read_loop")); OnTransportActivityDone("read_loop"));
} }
@ -265,24 +271,28 @@ auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id,
return absl::OkStatus(); return absl::OkStatus();
}); });
}; };
return TrySeq( return GRPC_LATENT_SEE_PROMISE(
"CallOutboundLoop",
TrySeq(
// Wait for initial metadata then send it out. // Wait for initial metadata then send it out.
call_handler.PullClientInitialMetadata(), call_handler.PullClientInitialMetadata(),
[send_fragment](ClientMetadataHandle md) mutable { [send_fragment](ClientMetadataHandle md) mutable {
GRPC_TRACE_LOG(chaotic_good, INFO) GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Sending initial metadata: " << md->DebugString(); << "CHAOTIC_GOOD: Sending initial metadata: "
<< md->DebugString();
ClientFragmentFrame frame; ClientFragmentFrame frame;
frame.headers = std::move(md); frame.headers = std::move(md);
return send_fragment(std::move(frame)); return send_fragment(std::move(frame));
}, },
// Continuously send client frame with client to server messages. // Continuously send client frame with client to server messages.
ForEach(OutgoingMessages(call_handler), ForEach(OutgoingMessages(call_handler),
[send_fragment, [send_fragment, aligned_bytes = aligned_bytes_](
aligned_bytes = aligned_bytes_](MessageHandle message) mutable { MessageHandle message) mutable {
ClientFragmentFrame frame; ClientFragmentFrame frame;
// Construct frame header (flags, header_length and // Construct frame header (flags, header_length and
// trailer_length will be added in serialization). // trailer_length will be added in serialization).
const uint32_t message_length = message->payload()->Length(); const uint32_t message_length =
message->payload()->Length();
const uint32_t padding = const uint32_t padding =
message_length % aligned_bytes == 0 message_length % aligned_bytes == 0
? 0 ? 0
@ -296,7 +306,7 @@ auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id,
ClientFragmentFrame frame; ClientFragmentFrame frame;
frame.end_of_stream = true; frame.end_of_stream = true;
return send_fragment(std::move(frame)); return send_fragment(std::move(frame));
}); }));
} }
void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) { void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {

@ -199,8 +199,9 @@ auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
auto ChaoticGoodServerTransport::CallOutboundLoop( auto ChaoticGoodServerTransport::CallOutboundLoop(
uint32_t stream_id, CallInitiator call_initiator) { uint32_t stream_id, CallInitiator call_initiator) {
auto outgoing_frames = outgoing_frames_.MakeSender(); auto outgoing_frames = outgoing_frames_.MakeSender();
return Seq( return GRPC_LATENT_SEE_PROMISE(
Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames, "CallOutboundLoop",
Seq(Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames,
call_initiator), call_initiator),
[stream_id](absl::Status main_body_result) { [stream_id](absl::Status main_body_result) {
GRPC_TRACE_VLOG(chaotic_good, 2) GRPC_TRACE_VLOG(chaotic_good, 2)
@ -216,8 +217,9 @@ auto ChaoticGoodServerTransport::CallOutboundLoop(
ServerFragmentFrame frame; ServerFragmentFrame frame;
frame.trailers = std::move(md); frame.trailers = std::move(md);
frame.stream_id = stream_id; frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames, call_initiator); return SendFragment(std::move(frame), outgoing_frames,
}); call_initiator);
}));
} }
auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall( auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
@ -269,10 +271,12 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall(
} }
auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) { auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) {
return TrySeq( return GRPC_LATENT_SEE_PROMISE(
"ReadOneFrame",
TrySeq(
transport.ReadFrameBytes(), transport.ReadFrameBytes(),
[this, transport = [this, transport = &transport](
&transport](std::tuple<FrameHeader, BufferPair> frame_bytes) { std::tuple<FrameHeader, BufferPair> frame_bytes) {
const auto& frame_header = std::get<0>(frame_bytes); const auto& frame_header = std::get<0>(frame_bytes);
auto& buffers = std::get<1>(frame_bytes); auto& buffers = std::get<1>(frame_bytes);
return Switch( return Switch(
@ -319,7 +323,7 @@ auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) {
static_cast<uint8_t>(frame_header.type))); static_cast<uint8_t>(frame_header.type)));
})); }));
}, },
[]() -> LoopCtl<absl::Status> { return Continue{}; }); []() -> LoopCtl<absl::Status> { return Continue{}; }));
} }
auto ChaoticGoodServerTransport::TransportReadLoop( auto ChaoticGoodServerTransport::TransportReadLoop(
@ -360,9 +364,13 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport(
party_arena->SetContext<grpc_event_engine::experimental::EventEngine>( party_arena->SetContext<grpc_event_engine::experimental::EventEngine>(
event_engine.get()); event_engine.get());
party_ = Party::Make(std::move(party_arena)); party_ = Party::Make(std::move(party_arena));
party_->Spawn("server-chaotic-writer", TransportWriteLoop(transport), party_->Spawn("server-chaotic-writer",
GRPC_LATENT_SEE_PROMISE("ServerTransportWriteLoop",
TransportWriteLoop(transport)),
OnTransportActivityDone("writer")); OnTransportActivityDone("writer"));
party_->Spawn("server-chaotic-reader", TransportReadLoop(transport), party_->Spawn("server-chaotic-reader",
GRPC_LATENT_SEE_PROMISE("ServerTransportReadLoop",
TransportReadLoop(transport)),
OnTransportActivityDone("reader")); OnTransportActivityDone("reader"));
} }

@ -102,7 +102,7 @@ namespace {
// of bytes sent. // of bytes sent.
ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno, ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno,
int additional_flags = 0) { int additional_flags = 0) {
GRPC_LATENT_SEE_INNER_SCOPE("TcpSend"); GRPC_LATENT_SEE_PARENT_SCOPE("TcpSend");
ssize_t sent_length; ssize_t sent_length;
do { do {
sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags); sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags);

@ -106,7 +106,8 @@ class PromiseEndpoint {
return absl::OkStatus(); return absl::OkStatus();
}; };
}, },
[this]() { GRPC_LATENT_SEE_PROMISE(
"DelayedWrite", ([this]() {
return [write_state = write_state_]() -> Poll<absl::Status> { return [write_state = write_state_]() -> Poll<absl::Status> {
// If current write isn't finished return `Pending()`, else // If current write isn't finished return `Pending()`, else
// return write result. // return write result.
@ -123,7 +124,7 @@ class PromiseEndpoint {
CHECK(expected == WriteState::kWriting); CHECK(expected == WriteState::kWriting);
return Pending(); return Pending();
}; };
}); })));
} }
// Returns a promise that resolves to `SliceBuffer` with // Returns a promise that resolves to `SliceBuffer` with
@ -174,13 +175,15 @@ class PromiseEndpoint {
return std::move(ret); return std::move(ret);
}; };
}, },
[this, num_bytes]() { GRPC_LATENT_SEE_PROMISE(
"DelayedRead", ([this, num_bytes]() {
return [read_state = read_state_, return [read_state = read_state_,
num_bytes]() -> Poll<absl::StatusOr<SliceBuffer>> { num_bytes]() -> Poll<absl::StatusOr<SliceBuffer>> {
if (!read_state->complete.load(std::memory_order_acquire)) { if (!read_state->complete.load(std::memory_order_acquire)) {
return Pending(); return Pending();
} }
// If read succeeds, return `SliceBuffer` with `num_bytes` bytes. // If read succeeds, return `SliceBuffer` with `num_bytes`
// bytes.
if (read_state->result.ok()) { if (read_state->result.ok()) {
SliceBuffer ret; SliceBuffer ret;
grpc_slice_buffer_move_first_no_inline( grpc_slice_buffer_move_first_no_inline(
@ -192,7 +195,7 @@ class PromiseEndpoint {
read_state->complete.store(false, std::memory_order_relaxed); read_state->complete.store(false, std::memory_order_relaxed);
return std::move(read_state->result); return std::move(read_state->result);
}; };
}); })));
} }
// Returns a promise that resolves to `Slice` with at least // Returns a promise that resolves to `Slice` with at least

@ -252,12 +252,26 @@ GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void Mark(const Metadata* md) {
Log::CurrentThreadBin()->Append(md, EventType::kMark, 0); Log::CurrentThreadBin()->Append(md, EventType::kMark, 0);
} }
template <typename P>
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION auto Promise(const Metadata* md_poll,
const Metadata* md_flow,
P promise) {
return [md_poll, md_flow, promise = std::move(promise),
flow = Flow(md_flow)]() mutable {
InnerScope scope(md_poll);
flow.End();
auto r = promise();
flow.Begin(md_flow);
return r;
};
}
} // namespace latent_see } // namespace latent_see
} // namespace grpc_core } // namespace grpc_core
#define GRPC_LATENT_SEE_METADATA(name) \ #define GRPC_LATENT_SEE_METADATA(name) \
[]() { \ []() { \
static grpc_core::latent_see::Metadata metadata = {__FILE__, __LINE__, \ static grpc_core::latent_see::Metadata metadata = {__FILE__, __LINE__, \
#name}; \ name}; \
return &metadata; \ return &metadata; \
}() }()
// Parent scope: logs a begin and end event, and flushes the thread log on scope // Parent scope: logs a begin and end event, and flushes the thread log on scope
@ -277,6 +291,9 @@ GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void Mark(const Metadata* md) {
// scope. // scope.
#define GRPC_LATENT_SEE_MARK(name) \ #define GRPC_LATENT_SEE_MARK(name) \
grpc_core::latent_see::Mark(GRPC_LATENT_SEE_METADATA(name)) grpc_core::latent_see::Mark(GRPC_LATENT_SEE_METADATA(name))
#define GRPC_LATENT_SEE_PROMISE(name, promise) \
grpc_core::latent_see::Promise(GRPC_LATENT_SEE_METADATA("Poll:" name), \
GRPC_LATENT_SEE_METADATA(name), promise)
#else // !def(GRPC_ENABLE_LATENT_SEE) #else // !def(GRPC_ENABLE_LATENT_SEE)
namespace grpc_core { namespace grpc_core {
namespace latent_see { namespace latent_see {
@ -305,6 +322,7 @@ struct InnerScope {
#define GRPC_LATENT_SEE_MARK(name) \ #define GRPC_LATENT_SEE_MARK(name) \
do { \ do { \
} while (0) } while (0)
#define GRPC_LATENT_SEE_PROMISE(name, promise) promise
#endif // GRPC_ENABLE_LATENT_SEE #endif // GRPC_ENABLE_LATENT_SEE
#endif // GRPC_SRC_CORE_UTIL_LATENT_SEE_H #endif // GRPC_SRC_CORE_UTIL_LATENT_SEE_H

Loading…
Cancel
Save