[latent-see] Improve visibility of party wakeups (#38053)

Just used this to find out we always do a tcp write for client initial metadata prior to payload

Closes #38053

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38053 from ctiller:party-see 6b5a2ba6cf
PiperOrigin-RevId: 696371772
pull/38120/head^2
Craig Tiller 2 weeks ago committed by Copybara-Service
parent ed7854e83d
commit a0d9ddf9ce
  1. 3
      src/core/lib/promise/party.cc
  2. 2
      src/core/lib/promise/party.h
  3. 3
      src/core/lib/surface/client_call.cc
  4. 1
      src/core/util/latent_see.h

@ -357,6 +357,7 @@ void Party::RunPartyAndUnref(uint64_t prev_state) {
} }
void Party::AddParticipants(Participant** participants, size_t count) { void Party::AddParticipants(Participant** participants, size_t count) {
GRPC_LATENT_SEE_INNER_SCOPE("Party::AddParticipants");
uint64_t state = state_.load(std::memory_order_acquire); uint64_t state = state_.load(std::memory_order_acquire);
uint64_t allocated; uint64_t allocated;
@ -400,6 +401,7 @@ void Party::AddParticipants(Participant** participants, size_t count) {
} }
void Party::AddParticipant(Participant* participant) { void Party::AddParticipant(Participant* participant) {
GRPC_LATENT_SEE_INNER_SCOPE("Party::AddParticipant");
uint64_t state = state_.load(std::memory_order_acquire); uint64_t state = state_.load(std::memory_order_acquire);
uint64_t allocated; uint64_t allocated;
size_t slot; size_t slot;
@ -468,6 +470,7 @@ void Party::WakeupAsync(WakeupMask wakeup_mask) {
wakeup_mask_ |= wakeup_mask; wakeup_mask_ |= wakeup_mask;
arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run( arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run(
[this, prev_state]() { [this, prev_state]() {
GRPC_LATENT_SEE_PARENT_SCOPE("Party::WakeupAsync");
ApplicationCallbackExecCtx app_exec_ctx; ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx; ExecCtx exec_ctx;
RunLockedAndUnref(this, prev_state); RunLockedAndUnref(this, prev_state);

@ -342,6 +342,7 @@ class Party : public Activity, private Wakeable {
// Wakeable implementation // Wakeable implementation
void Wakeup(WakeupMask wakeup_mask) final { void Wakeup(WakeupMask wakeup_mask) final {
GRPC_LATENT_SEE_INNER_SCOPE("Party::Wakeup");
if (Activity::current() == this) { if (Activity::current() == this) {
wakeup_mask_ |= wakeup_mask; wakeup_mask_ |= wakeup_mask;
Unref(); Unref();
@ -352,6 +353,7 @@ class Party : public Activity, private Wakeable {
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void WakeupFromState( GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void WakeupFromState(
uint64_t cur_state, WakeupMask wakeup_mask) { uint64_t cur_state, WakeupMask wakeup_mask) {
GRPC_LATENT_SEE_INNER_SCOPE("Party::WakeupFromState");
DCHECK_NE(wakeup_mask & kWakeupMask, 0u) DCHECK_NE(wakeup_mask & kWakeupMask, 0u)
<< "Wakeup mask must be non-zero: " << wakeup_mask; << "Wakeup mask must be non-zero: " << wakeup_mask;
while (true) { while (true) {

@ -184,6 +184,7 @@ void ClientCall::CancelWithError(grpc_error_handle error) {
template <typename Batch> template <typename Batch>
void ClientCall::ScheduleCommittedBatch(Batch batch) { void ClientCall::ScheduleCommittedBatch(Batch batch) {
GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::ScheduleCommittedBatch");
auto cur_state = call_state_.load(std::memory_order_acquire); auto cur_state = call_state_.load(std::memory_order_acquire);
while (true) { while (true) {
switch (cur_state) { switch (cur_state) {
@ -225,6 +226,7 @@ void ClientCall::ScheduleCommittedBatch(Batch batch) {
} }
void ClientCall::StartCall(const grpc_op& send_initial_metadata_op) { void ClientCall::StartCall(const grpc_op& send_initial_metadata_op) {
GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::StartCall");
auto cur_state = call_state_.load(std::memory_order_acquire); auto cur_state = call_state_.load(std::memory_order_acquire);
CToMetadata(send_initial_metadata_op.data.send_initial_metadata.metadata, CToMetadata(send_initial_metadata_op.data.send_initial_metadata.metadata,
send_initial_metadata_op.data.send_initial_metadata.count, send_initial_metadata_op.data.send_initial_metadata.count,
@ -271,6 +273,7 @@ void ClientCall::StartCall(const grpc_op& send_initial_metadata_op) {
void ClientCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag, void ClientCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
bool is_notify_tag_closure) { bool is_notify_tag_closure) {
GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::CommitBatch");
if (nops == 1 && ops[0].op == GRPC_OP_SEND_INITIAL_METADATA) { if (nops == 1 && ops[0].op == GRPC_OP_SEND_INITIAL_METADATA) {
StartCall(ops[0]); StartCall(ops[0]);
EndOpImmediately(cq_, notify_tag, is_notify_tag_closure); EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);

@ -35,6 +35,7 @@
#include "absl/functional/function_ref.h" #include "absl/functional/function_ref.h"
#include "absl/log/log.h" #include "absl/log/log.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "src/core/util/per_cpu.h" #include "src/core/util/per_cpu.h"
#include "src/core/util/sync.h" #include "src/core/util/sync.h"

Loading…
Cancel
Save