[tracing] Remove debugging annotations

PiperOrigin-RevId: 610469149
pull/35999/head
Yousuk Seung 12 months ago committed by Copybara-Service
parent d4afc993ed
commit 641213ee58
  1. 38
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 1
      src/core/ext/transport/chttp2/transport/internal.h
  3. 32
      src/core/ext/transport/chttp2/transport/writing.cc
  4. 39
      src/core/lib/surface/call.cc
  5. 2
      src/python/grpcio/grpc/aio/_interceptor.py

@ -1300,7 +1300,6 @@ static void null_then_sched_closure(grpc_closure** closure) {
}
void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
grpc_chttp2_stream* s,
grpc_closure** pclosure,
grpc_error_handle error,
const char* desc,
@ -1325,12 +1324,6 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
write_state_name(t->write_state), whence.file(), whence.line());
}
if (s->call_tracer) {
s->call_tracer->RecordAnnotation(
absl::StrFormat("on_complete: s=%p %p desc=%s err=%s", s, closure, desc,
grpc_core::StatusToString(error).c_str()));
}
if (!error.ok()) {
grpc_error_handle cl_err =
grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
@ -1405,13 +1398,6 @@ static void perform_stream_op_locked(void* stream_op,
}
}
if (s->call_tracer) {
s->call_tracer->RecordAnnotation(absl::StrFormat(
"perform_stream_op_locked[s=%p; op=%p]: %s; on_complete = %p", s, op,
grpc_transport_stream_op_batch_string(op, true).c_str(),
op->on_complete));
}
grpc_closure* on_complete = op->on_complete;
// on_complete will be null if and only if there are no send ops in the batch.
if (on_complete != nullptr) {
@ -1483,7 +1469,7 @@ static void perform_stream_op_locked(void* stream_op,
} else {
s->send_initial_metadata = nullptr;
grpc_chttp2_complete_closure_step(
t, s, &s->send_initial_metadata_finished,
t, &s->send_initial_metadata_finished,
GRPC_ERROR_CREATE_REFERENCING(
"Attempt to send initial metadata after stream was closed",
&s->write_closed_error, 1),
@ -1503,7 +1489,7 @@ static void perform_stream_op_locked(void* stream_op,
// We should NOT return an error here, so as to avoid a cancel OP being
// started. The surface layer will notice that the stream has been closed
// for writes and fail the send message op.
grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished,
grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
absl::OkStatus(),
"fetching_send_message_finished");
} else {
@ -1543,7 +1529,7 @@ static void perform_stream_op_locked(void* stream_op,
int64_t notify_offset = s->next_message_end_offset;
if (notify_offset <= s->flow_controlled_bytes_written) {
grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished,
grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
absl::OkStatus(),
"fetching_send_message_finished");
} else {
@ -1587,7 +1573,7 @@ static void perform_stream_op_locked(void* stream_op,
s->send_trailing_metadata = nullptr;
s->sent_trailing_metadata_op = nullptr;
grpc_chttp2_complete_closure_step(
t, s, &s->send_trailing_metadata_finished,
t, &s->send_trailing_metadata_finished,
op->payload->send_trailing_metadata.send_trailing_metadata->empty()
? absl::OkStatus()
: GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
@ -1640,7 +1626,7 @@ static void perform_stream_op_locked(void* stream_op,
}
if (on_complete != nullptr) {
grpc_chttp2_complete_closure_step(t, s, &on_complete, absl::OkStatus(),
grpc_chttp2_complete_closure_step(t, &on_complete, absl::OkStatus(),
"op->on_complete");
}
@ -2307,13 +2293,13 @@ static grpc_error_handle removal_error(grpc_error_handle extra_error,
return error;
}
static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
static void flush_write_list(grpc_chttp2_transport* t,
grpc_chttp2_write_cb** list,
grpc_error_handle error) {
while (*list) {
grpc_chttp2_write_cb* cb = *list;
*list = cb->next;
grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
grpc_chttp2_complete_closure_step(t, &cb->closure, error,
"on_write_finished_cb");
cb->next = t->write_cb_pool;
t->write_cb_pool = cb;
@ -2326,18 +2312,18 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
error =
removal_error(error, s, "Pending writes failed due to stream closure");
s->send_initial_metadata = nullptr;
grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
grpc_chttp2_complete_closure_step(t, &s->send_initial_metadata_finished,
error, "send_initial_metadata_finished");
s->send_trailing_metadata = nullptr;
s->sent_trailing_metadata_op = nullptr;
grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
grpc_chttp2_complete_closure_step(t, &s->send_trailing_metadata_finished,
error, "send_trailing_metadata_finished");
grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished, error,
grpc_chttp2_complete_closure_step(t, &s->send_message_finished, error,
"fetching_send_message_finished");
flush_write_list(t, s, &s->on_write_finished_cbs, error);
flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
flush_write_list(t, &s->on_write_finished_cbs, error);
flush_write_list(t, &s->on_flow_controlled_cbs, error);
}
grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(

@ -778,7 +778,6 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t);
void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
grpc_chttp2_stream* s,
grpc_closure** pclosure,
grpc_error_handle error,
const char* desc,

@ -82,10 +82,9 @@ static void add_to_write_list(grpc_chttp2_write_cb** list,
*list = cb;
}
static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_chttp2_write_cb* cb, grpc_error_handle error) {
grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
"finish_write_cb");
static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_write_cb* cb,
grpc_error_handle error) {
grpc_chttp2_complete_closure_step(t, &cb->closure, error, "finish_write_cb");
cb->next = t->write_cb_pool;
t->write_cb_pool = cb;
}
@ -186,9 +185,9 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
});
}
static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
int64_t send_bytes, grpc_chttp2_write_cb** list,
int64_t* ctr, grpc_error_handle error) {
static bool update_list(grpc_chttp2_transport* t, int64_t send_bytes,
grpc_chttp2_write_cb** list, int64_t* ctr,
grpc_error_handle error) {
bool sched_any = false;
grpc_chttp2_write_cb* cb = *list;
*list = nullptr;
@ -197,7 +196,7 @@ static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_chttp2_write_cb* next = cb->next;
if (cb->call_at_byte <= *ctr) {
sched_any = true;
finish_write_cb(t, s, cb, error);
finish_write_cb(t, cb, error);
} else {
add_to_write_list(list, cb);
}
@ -424,8 +423,7 @@ class DataSendContext {
void CallCallbacks() {
if (update_list(
t_, s_,
static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
t_, static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
&s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
absl::OkStatus())) {
write_context_->NoteScheduledResults();
@ -484,9 +482,9 @@ class StreamWriteContext {
s_->send_initial_metadata = nullptr;
s_->sent_initial_metadata = true;
write_context_->NoteScheduledResults();
grpc_chttp2_complete_closure_step(
t_, s_, &s_->send_initial_metadata_finished, absl::OkStatus(),
"send_initial_metadata_finished");
grpc_chttp2_complete_closure_step(t_, &s_->send_initial_metadata_finished,
absl::OkStatus(),
"send_initial_metadata_finished");
if (s_->call_tracer) {
grpc_core::HttpAnnotation::WriteStats write_stats;
write_stats.target_write_size = write_context_->target_write_size();
@ -583,9 +581,9 @@ class StreamWriteContext {
SentLastFrame();
write_context_->NoteScheduledResults();
grpc_chttp2_complete_closure_step(
t_, s_, &s_->send_trailing_metadata_finished, absl::OkStatus(),
"send_trailing_metadata_finished");
grpc_chttp2_complete_closure_step(t_, &s_->send_trailing_metadata_finished,
absl::OkStatus(),
"send_trailing_metadata_finished");
}
bool stream_became_writable() { return stream_became_writable_; }
@ -746,7 +744,7 @@ void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) {
while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
if (s->sending_bytes != 0) {
update_list(t, s, static_cast<int64_t>(s->sending_bytes),
update_list(t, static_cast<int64_t>(s->sending_bytes),
&s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
error);
s->sending_bytes = 0;

@ -648,31 +648,12 @@ class FilterStackCall final : public Call {
}
bool completed_batch_step(PendingOp op) {
auto mask = PendingOpMask(op);
// Acquire call tracer before ops_pending_.fetch_sub to avoid races with
// call_ being set to nullptr in PostCompletion method. Store the
// call_tracer_ and call_ variables locally as well because they could be
// modified by another thread after the fetch_sub operation.
CallTracerAnnotationInterface* call_tracer = call_tracer_;
FilterStackCall* call = call_;
bool is_call_trace_enabled = grpc_call_trace.enabled();
bool is_call_ops_annotate_enabled =
(IsTraceRecordCallopsEnabled() && call_tracer != nullptr);
if (is_call_ops_annotate_enabled) {
call->InternalRef("Call ops annotate");
}
auto r = ops_pending_.fetch_sub(mask, std::memory_order_acq_rel);
if (is_call_trace_enabled || is_call_ops_annotate_enabled) {
std::string trace_string = absl::StrFormat(
"BATCH:%p COMPLETE:%s REMAINING:%s (tag:%p)", this,
PendingOpString(mask).c_str(), PendingOpString(r & ~mask).c_str(),
completion_data_.notify_tag.tag);
if (is_call_trace_enabled) {
gpr_log(GPR_DEBUG, "%s", trace_string.c_str());
}
if (is_call_ops_annotate_enabled) {
call_tracer->RecordAnnotation(trace_string);
call->InternalUnref("Call ops annotate");
}
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "BATCH:%p COMPLETE:%s REMAINING:%s (tag:%p)", this,
PendingOpString(mask).c_str(),
PendingOpString(r & ~mask).c_str(),
completion_data_.notify_tag.tag);
}
GPR_ASSERT((r & mask) != 0);
return r == mask;
@ -1538,7 +1519,6 @@ grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
grpc_transport_stream_op_batch_payload* stream_op_payload;
uint32_t seen_ops = 0;
intptr_t pending_ops = 0;
CallTracerAnnotationInterface* call_tracer = nullptr;
for (i = 0; i < nops; i++) {
if (seen_ops & (1u << ops[i].op)) {
@ -1899,15 +1879,6 @@ grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
stream_op->on_complete = &bctl->finish_batch_;
}
call_tracer = static_cast<CallTracerAnnotationInterface*>(
ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE));
if ((IsTraceRecordCallopsEnabled() && call_tracer != nullptr)) {
call_tracer->RecordAnnotation(absl::StrFormat(
"BATCH:%p START:%s BATCH:%s (tag:%p)", bctl,
PendingOpString(pending_ops).c_str(),
grpc_transport_stream_op_batch_string(stream_op, true).c_str(),
bctl->completion_data_.notify_tag.tag));
}
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "BATCH:%p START:%s BATCH:%s (tag:%p)", bctl,
PendingOpString(pending_ops).c_str(),

@ -476,7 +476,7 @@ class _InterceptedStreamResponseMixin:
_response_aiter: Optional[AsyncIterable[ResponseType]]
def _init_stream_response_mixin(self) -> None:
# Is initalized later, otherwise if the iterator is not finally
# Is initalized later, otherwise if the iterator is not finnally
# consumed a logging warning is emmited by Asyncio.
self._response_aiter = None

Loading…
Cancel
Save