[latent-see] Fix some bugs, add basic annotations through the stack (#37297)

Closes #37297

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37297 from ctiller:circle-of-life 6e0c455792
PiperOrigin-RevId: 656087393
pull/37320/head
Craig Tiller 6 months ago committed by Copybara-Service
parent c02437a92f
commit efe62e01f0
  1. 1
      BUILD
  2. 2
      CMakeLists.txt
  3. 4
      build_autogenerated.yaml
  4. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  5. 2
      src/core/ext/transport/chttp2/transport/internal.h
  6. 2
      src/core/ext/transport/chttp2/transport/parsing.cc
  7. 7
      src/core/ext/transport/chttp2/transport/writing.cc
  8. 1
      src/core/handshaker/security/secure_endpoint.cc
  9. 5
      src/core/lib/channel/promise_based_filter.cc
  10. 2
      src/core/lib/channel/promise_based_filter.h
  11. 3
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  12. 13
      src/core/lib/iomgr/exec_ctx.h
  13. 3
      src/core/lib/iomgr/tcp_posix.cc
  14. 6
      src/core/lib/surface/completion_queue.cc
  15. 2
      src/core/lib/surface/filter_stack_call.cc
  16. 13
      src/core/util/latent_see.cc
  17. 96
      src/core/util/latent_see.h

@ -3279,6 +3279,7 @@ grpc_cc_library(
"//src/core:experiments",
"//src/core:gpr_atm",
"//src/core:gpr_spinlock",
"//src/core:latent_see",
"//src/core:time",
"//src/core:useful",
],

2
CMakeLists.txt generated

@ -22414,6 +22414,7 @@ add_executable(periodic_update_test
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/per_cpu.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
src/core/lib/iomgr/closure.cc
@ -22426,6 +22427,7 @@ add_executable(periodic_update_test
src/core/lib/slice/percent_encoding.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/util/latent_see.cc
test/core/resource_quota/periodic_update_test.cc
)
if(WIN32 AND MSVC)

@ -14454,6 +14454,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/per_cpu.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/time.h
- src/core/lib/iomgr/closure.h
@ -14468,6 +14469,7 @@ targets:
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/util/latent_see.h
- src/core/util/spinlock.h
- third_party/upb/upb/generated_code_support.h
src:
@ -14478,6 +14480,7 @@ targets:
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/per_cpu.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/iomgr/closure.cc
@ -14490,6 +14493,7 @@ targets:
- src/core/lib/slice/percent_encoding.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/util/latent_see.cc
- test/core/resource_quota/periodic_update_test.cc
deps:
- gtest

@ -1026,6 +1026,7 @@ static const char* begin_writing_desc(bool partial) {
static void write_action_begin_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
grpc_error_handle /*error_ignored*/) {
GRPC_LATENT_SEE_INNER_SCOPE("write_action_begin_locked");
CHECK(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_chttp2_begin_write_result r;
if (!t->closed_with_error.ok()) {
@ -2676,6 +2677,7 @@ static void read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
static void read_action_parse_loop_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
grpc_error_handle error) {
GRPC_LATENT_SEE_INNER_SCOPE("read_action_parse_loop_locked");
if (t->closed_with_error.ok()) {
grpc_error_handle errors[3] = {error, absl::OkStatus(), absl::OkStatus()};
size_t requests_started = 0;

@ -545,6 +545,8 @@ struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport,
// What percentage of rst_stream frames on the server should cause a ping
// frame to be generated.
uint8_t ping_on_rst_stream_percent;
GPR_NO_UNIQUE_ADDRESS grpc_core::latent_see::Flow write_flow;
};
typedef enum {

@ -205,6 +205,8 @@ std::string FrameTypeString(uint8_t frame_type, uint8_t flags) {
absl::variant<size_t, absl::Status> grpc_chttp2_perform_read(
grpc_chttp2_transport* t, const grpc_slice& slice,
size_t& requests_started) {
GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_perform_read");
const uint8_t* beg = GRPC_SLICE_START_PTR(slice);
const uint8_t* end = GRPC_SLICE_END_PTR(slice);
const uint8_t* cur = beg;

@ -677,6 +677,8 @@ class StreamWriteContext {
grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
grpc_chttp2_transport* t) {
GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_begin_write");
int64_t outbuf_relative_start_pos = 0;
WriteContext ctx(t);
ctx.FlushSettings();
@ -732,12 +734,17 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
maybe_initiate_ping(t);
t->write_flow.Begin(GRPC_LATENT_SEE_METADATA("write"));
return ctx.Result();
}
void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) {
GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_end_write");
grpc_chttp2_stream* s;
t->write_flow.End();
if (t->channelz_socket != nullptr) {
t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
}

@ -387,6 +387,7 @@ static void on_write(void* user_data, grpc_error_handle error) {
static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* arg, int max_frame_size) {
GRPC_LATENT_SEE_INNER_SCOPE("secure_endpoint write");
unsigned i;
tsi_result result = TSI_OK;
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);

@ -244,7 +244,10 @@ void BaseCallData::CapturedBatch::CancelWith(grpc_error_handle error,
///////////////////////////////////////////////////////////////////////////////
// BaseCallData::Flusher
BaseCallData::Flusher::Flusher(BaseCallData* call) : call_(call) {
BaseCallData::Flusher::Flusher(BaseCallData* call)
: latent_see::InnerScope(
GRPC_LATENT_SEE_METADATA("PromiseBasedFilter Flusher")),
call_(call) {
GRPC_CALL_STACK_REF(call_->call_stack(), "flusher");
}

@ -947,7 +947,7 @@ class BaseCallData : public Activity, private Wakeable {
}
};
class Flusher {
class Flusher : public latent_see::InnerScope {
public:
explicit Flusher(BaseCallData* call);
// Calls closures, schedules batches, relinquishes call combiner.

@ -102,6 +102,7 @@ namespace {
// of bytes sent.
ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno,
int additional_flags = 0) {
GRPC_LATENT_SEE_INNER_SCOPE("TcpSend");
ssize_t sent_length;
do {
sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags);
@ -286,6 +287,8 @@ absl::Status PosixEndpointImpl::TcpAnnotateError(absl::Status src_error) const {
// Returns true if data available to read or error other than EAGAIN.
bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
GRPC_LATENT_SEE_INNER_SCOPE("TcpDoRead");
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;

@ -41,6 +41,7 @@
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/util/latent_see.h"
#include "src/core/util/time_precise.h"
#if !defined(_WIN32) || !defined(_DLL)
@ -109,17 +110,23 @@ class Combiner;
/// since that implies a core re-entry outside of application
/// callbacks.
///
class GRPC_DLL ExecCtx {
class GRPC_DLL ExecCtx : public latent_see::ParentScope {
public:
/// Default Constructor
ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
ExecCtx()
: latent_see::ParentScope(GRPC_LATENT_SEE_METADATA("ExecCtx")),
flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
Fork::IncExecCtxCount();
Set(this);
}
/// Parameterised Constructor
explicit ExecCtx(uintptr_t fl) : flags_(fl) {
explicit ExecCtx(uintptr_t fl)
: ExecCtx(fl, GRPC_LATENT_SEE_METADATA("ExecCtx")) {}
explicit ExecCtx(uintptr_t fl, latent_see::Metadata* latent_see_metadata)
: latent_see::ParentScope(latent_see_metadata), flags_(fl) {
if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
Fork::IncExecCtxCount();
}

@ -915,6 +915,7 @@ static void update_rcvlowat(grpc_tcp* tcp)
#define MAX_READ_IOVEC 64
static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {
GRPC_LATENT_SEE_INNER_SCOPE("tcp_do_read");
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "TCP:" << tcp << " do_read";
}
@ -1211,6 +1212,7 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
// of bytes sent.
ssize_t tcp_send(int fd, const struct msghdr* msg, int* saved_errno,
int additional_flags = 0) {
GRPC_LATENT_SEE_INNER_SCOPE("tcp_send");
ssize_t sent_length;
do {
// TODO(klempner): Cork if this is a partial write
@ -1408,6 +1410,7 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
/// messages from the queue.
///
static bool process_errors(grpc_tcp* tcp) {
GRPC_LATENT_SEE_INNER_SCOPE("process_errors");
bool processed_err = false;
struct iovec iov;
iov.iov_base = nullptr;

@ -886,7 +886,8 @@ struct cq_is_finished_arg {
class ExecCtxNext : public grpc_core::ExecCtx {
public:
explicit ExecCtxNext(void* arg)
: ExecCtx(0), check_ready_to_finish_arg_(arg) {}
: ExecCtx(0, GRPC_LATENT_SEE_METADATA("ExecCtx for CqNext")),
check_ready_to_finish_arg_(arg) {}
bool CheckReadyToFinish() override {
cq_is_finished_arg* a =
@ -1133,7 +1134,8 @@ static void del_plucker(grpc_completion_queue* cq, void* tag,
class ExecCtxPluck : public grpc_core::ExecCtx {
public:
explicit ExecCtxPluck(void* arg)
: ExecCtx(0), check_ready_to_finish_arg_(arg) {}
: ExecCtx(0, GRPC_LATENT_SEE_METADATA("ExecCtx for CqPluck")),
check_ready_to_finish_arg_(arg) {}
bool CheckReadyToFinish() override {
cq_is_finished_arg* a =

@ -730,6 +730,8 @@ void FilterStackCall::BatchControl::FinishBatch(grpc_error_handle error) {
grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
void* notify_tag,
bool is_notify_tag_closure) {
GRPC_LATENT_SEE_INNER_SCOPE("FilterStackCall::StartBatch");
size_t i;
const grpc_op* op;
BatchControl* bctl;

@ -24,9 +24,11 @@
namespace grpc_core {
namespace latent_see {
thread_local std::vector<Log::Event> Log::thread_events_;
thread_local uint64_t Log::thread_id_ = Log::Get().next_thread_id_.fetch_add(1);
thread_local Bin* Log::bin_ = nullptr;
thread_local void* Log::bin_owner_ = nullptr;
std::atomic<uint64_t> Flow::next_flow_id_{1};
std::atomic<Bin*> Log::free_bins_;
std::string Log::GenerateJson() {
std::vector<RecordedEvent> events;
@ -91,9 +93,8 @@ std::string Log::GenerateJson() {
return json;
}
void Log::FlushThreadLog() {
auto& thread_events = thread_events_;
if (thread_events.empty()) return;
void Log::FlushBin(Bin* bin) {
if (bin->events.empty()) return;
auto& log = Get();
const auto batch_id =
log.next_batch_id_.fetch_add(1, std::memory_order_relaxed);
@ -101,11 +102,11 @@ void Log::FlushThreadLog() {
const auto thread_id = thread_id_;
{
MutexLock lock(&fragment.mu);
for (auto event : thread_events) {
for (auto event : bin->events) {
fragment.events.push_back(RecordedEvent{thread_id, batch_id, event});
}
}
thread_events.clear();
bin->events.clear();
}
} // namespace latent_see

@ -39,19 +39,58 @@ struct Metadata {
enum class EventType : uint8_t { kBegin, kEnd, kFlowStart, kFlowEnd, kMark };
// A bin collects all events that occur within a parent scope.
struct Bin {
struct Event {
const Metadata* metadata;
std::chrono::steady_clock::time_point timestamp;
uint64_t id;
EventType type;
};
void Append(const Metadata* metadata, EventType type, uint64_t id) {
events.push_back(
Event{metadata, std::chrono::steady_clock::now(), id, type});
}
std::vector<Event> events;
Bin* next_free;
};
class Log {
public:
static void FlushThreadLog();
static Bin* MaybeStartBin(void* owner) {
if (bin_ != nullptr) return bin_;
Bin* bin = free_bins_.load(std::memory_order_acquire);
do {
if (bin == nullptr) {
bin = new Bin();
break;
}
} while (!free_bins_.compare_exchange_weak(bin, bin->next_free,
std::memory_order_acq_rel));
bin_ = bin;
bin_owner_ = owner;
return bin;
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static void Append(
const Metadata* metadata, EventType type, uint64_t id) {
thread_events_.push_back(
Event{metadata, std::chrono::steady_clock::now(), id, type});
static void EndBin(void* owner) {
if (bin_owner_ != owner) return;
FlushBin(bin_);
bin_->next_free = free_bins_.load(std::memory_order_acquire);
while (!free_bins_.compare_exchange_weak(bin_->next_free, bin_,
std::memory_order_acq_rel)) {
}
bin_ = nullptr;
}
static Bin* CurrentThreadBin() { return bin_; }
private:
Log() = default;
static void FlushBin(Bin* bin);
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Log& Get() {
static Log* log = []() {
atexit([] {
@ -68,21 +107,17 @@ class Log {
std::string GenerateJson();
struct Event {
const Metadata* metadata;
std::chrono::steady_clock::time_point timestamp;
uint64_t id;
EventType type;
};
struct RecordedEvent {
uint64_t thread_id;
uint64_t batch_id;
Event event;
Bin::Event event;
};
std::atomic<uint64_t> next_thread_id_{1};
std::atomic<uint64_t> next_batch_id_{1};
static thread_local std::vector<Event> thread_events_;
static thread_local uint64_t thread_id_;
static thread_local Bin* bin_;
static thread_local void* bin_owner_;
static std::atomic<Bin*> free_bins_;
struct Fragment {
Mutex mu;
std::vector<RecordedEvent> events ABSL_GUARDED_BY(mu);
@ -90,16 +125,16 @@ class Log {
PerCpu<Fragment> fragments_{PerCpuOptions()};
};
template <bool kFlush>
template <bool kParent>
class Scope {
public:
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit Scope(const Metadata* metadata)
: metadata_(metadata) {
Log::Append(metadata_, EventType::kBegin, 0);
bin_->Append(metadata_, EventType::kBegin, 0);
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION ~Scope() {
Log::Append(metadata_, EventType::kEnd, 0);
if (kFlush) Log::FlushThreadLog();
bin_->Append(metadata_, EventType::kEnd, 0);
if (kParent) Log::EndBin(this);
}
Scope(const Scope&) = delete;
@ -107,6 +142,8 @@ class Scope {
private:
const Metadata* const metadata_;
Bin* const bin_ =
kParent ? Log::MaybeStartBin(this) : Log::CurrentThreadBin();
};
using ParentScope = Scope<true>;
@ -118,11 +155,11 @@ class Flow {
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit Flow(const Metadata* metadata)
: metadata_(metadata),
id_(next_flow_id_.fetch_add(1, std::memory_order_relaxed)) {
Log::Append(metadata_, EventType::kFlowStart, id_);
Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowStart, id_);
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION ~Flow() {
if (metadata_ != nullptr) {
Log::Append(metadata_, EventType::kFlowEnd, id_);
Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowEnd, id_);
}
}
@ -131,7 +168,9 @@ class Flow {
Flow(Flow&& other) noexcept
: metadata_(std::exchange(other.metadata_, nullptr)), id_(other.id_) {}
Flow& operator=(Flow&& other) noexcept {
if (metadata_ != nullptr) Log::Append(metadata_, EventType::kFlowEnd, id_);
if (metadata_ != nullptr) {
Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowEnd, id_);
}
metadata_ = std::exchange(other.metadata_, nullptr);
id_ = other.id_;
return *this;
@ -142,15 +181,18 @@ class Flow {
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void End() {
if (metadata_ == nullptr) return;
Log::Append(metadata_, EventType::kFlowEnd, id_);
Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowEnd, id_);
metadata_ = nullptr;
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Begin(const Metadata* metadata) {
if (metadata_ != nullptr) Log::Append(metadata_, EventType::kFlowEnd, id_);
auto* bin = Log::CurrentThreadBin();
if (metadata_ != nullptr) {
bin->Append(metadata_, EventType::kFlowEnd, id_);
}
metadata_ = metadata;
if (metadata_ == nullptr) return;
id_ = next_flow_id_.fetch_add(1, std::memory_order_relaxed);
Log::Append(metadata_, EventType::kFlowStart, id_);
bin->Append(metadata_, EventType::kFlowStart, id_);
}
private:
@ -160,7 +202,7 @@ class Flow {
};
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void Mark(const Metadata* md) {
Log::Append(md, EventType::kMark, 0);
Log::CurrentThreadBin()->Append(md, EventType::kMark, 0);
}
} // namespace latent_see
@ -197,6 +239,12 @@ struct Flow {
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void End() {}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Begin(Metadata*) {}
};
struct ParentScope {
explicit ParentScope(Metadata*) {}
};
struct InnerScope {
explicit InnerScope(Metadata*) {}
};
} // namespace latent_see
} // namespace grpc_core
#define GRPC_LATENT_SEE_METADATA(name) nullptr

Loading…
Cancel
Save