diff --git a/BUILD b/BUILD index 9745ce61a69..69fc678baf3 100644 --- a/BUILD +++ b/BUILD @@ -2916,6 +2916,7 @@ grpc_cc_library( "//src/core:channel_stack_type", "//src/core:context", "//src/core:error", + "//src/core:experiments", "//src/core:logging_filter", "//src/core:metadata_batch", "//src/core:slice", @@ -4686,6 +4687,7 @@ grpc_cc_library( "absl/strings", ], deps = [ + "call_tracer", "chttp2_bin_encoder", "chttp2_legacy_frame", "chttp2_varint", diff --git a/requirements.bazel.txt b/requirements.bazel.txt index 80a71e2c84b..02d32adc5a3 100644 --- a/requirements.bazel.txt +++ b/requirements.bazel.txt @@ -7,7 +7,7 @@ oauth2client==4.1.0 requests==2.25.1 urllib3==1.26.18 chardet==3.0.4 -certifi==2024.7.4 +certifi==2023.7.22 idna==2.7 gevent==22.08.0 zope.event==4.5.0 diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index e1ef1d46e98..3a5c6ea7544 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -257,6 +257,7 @@ grpc_core::CopyContextFn g_get_copied_context_fn = nullptr; namespace grpc_core { namespace { + // Initialize a grpc_closure \a c to call \a Fn with \a t and \a error. Holds // the passed in reference to \a t until it's moved into Fn. template , grpc_error_handle)> @@ -272,13 +273,12 @@ grpc_closure* InitTransportClosure(RefCountedPtr t, t.release(), nullptr); return c; } -} // namespace -namespace { TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr; TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback = nullptr; bool test_only_disable_transient_failure_state_notification = false; + } // namespace void TestOnlySetGlobalHttp2TransportInitCallback( @@ -361,6 +361,34 @@ std::string HttpAnnotation::ToString() const { return s; } +void Chttp2CallTracerWrapper::RecordIncomingBytes( + const CallTracerInterface::TransportByteSize& transport_byte_size) { + // Update legacy API. + stream_->stats.incoming.framing_bytes += transport_byte_size.framing_bytes; + stream_->stats.incoming.data_bytes += transport_byte_size.data_bytes; + stream_->stats.incoming.header_bytes += transport_byte_size.header_bytes; + // Update new API. + if (!IsCallTracerInTransportEnabled()) return; + auto* call_tracer = stream_->arena->GetContext(); + if (call_tracer != nullptr) { + call_tracer->RecordIncomingBytes(transport_byte_size); + } +} + +void Chttp2CallTracerWrapper::RecordOutgoingBytes( + const CallTracerInterface::TransportByteSize& transport_byte_size) { + // Update legacy API. + stream_->stats.outgoing.framing_bytes += transport_byte_size.framing_bytes; + stream_->stats.outgoing.data_bytes += transport_byte_size.data_bytes; + stream_->stats.outgoing.header_bytes += + transport_byte_size.header_bytes; // Update new API. + if (!IsCallTracerInTransportEnabled()) return; + auto* call_tracer = stream_->arena->GetContext(); + if (call_tracer != nullptr) { + call_tracer->RecordOutgoingBytes(transport_byte_size); + } +} + } // namespace grpc_core // @@ -814,7 +842,8 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, return refcount; }()), arena(arena), - flow_control(&t->flow_control) { + flow_control(&t->flow_control), + call_tracer_wrapper(this) { t->streams_allocated.fetch_add(1, std::memory_order_relaxed); if (server_data) { id = static_cast(reinterpret_cast(server_data)); @@ -1344,7 +1373,9 @@ static void perform_stream_op_locked(void* stream_op, grpc_chttp2_transport* t = s->t.get(); s->traced = op->is_traced; - s->call_tracer = CallTracerIfSampled(s); + if (!grpc_core::IsCallTracerInTransportEnabled()) { + s->call_tracer = CallTracerIfSampled(s); + } s->tcp_tracer = TcpTracerIfSampled(s); if (GRPC_TRACE_FLAG_ENABLED(http)) { LOG(INFO) << "perform_stream_op_locked[s=" << s << "; op=" << op @@ -1375,12 +1406,24 @@ static void perform_stream_op_locked(void* stream_op, } if (op->send_initial_metadata) { - if (s->call_tracer != nullptr) { - s->call_tracer->RecordAnnotation( - grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart, - gpr_now(GPR_CLOCK_REALTIME)) - .Add(s->t->flow_control.stats()) - .Add(s->flow_control.stats())); + if (!grpc_core::IsCallTracerInTransportEnabled()) { + if (s->call_tracer != nullptr) { + s->call_tracer->RecordAnnotation( + grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart, + gpr_now(GPR_CLOCK_REALTIME)) + .Add(s->t->flow_control.stats()) + .Add(s->flow_control.stats())); + } + } else if (grpc_core::IsTraceRecordCallopsEnabled()) { + auto* call_tracer = + s->arena->GetContext(); + if (call_tracer != nullptr && call_tracer->IsSampled()) { + call_tracer->RecordAnnotation( + grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart, + gpr_now(GPR_CLOCK_REALTIME)) + .Add(s->t->flow_control.stats()) + .Add(s->flow_control.stats())); + } } if (t->is_client && t->channelz_socket != nullptr) { t->channelz_socket->RecordStreamStartedFromLocal(); @@ -1464,9 +1507,8 @@ static void perform_stream_op_locked(void* stream_op, frame_hdr[3] = static_cast(len >> 8); frame_hdr[4] = static_cast(len); - s->stats.outgoing.framing_bytes += GRPC_HEADER_SIZE_IN_BYTES; - s->stats.outgoing.data_bytes += - op_payload->send_message.send_message->Length(); + s->call_tracer_wrapper.RecordOutgoingBytes( + {GRPC_HEADER_SIZE_IN_BYTES, len, 0}); s->next_message_end_offset = s->flow_controlled_bytes_written + static_cast(s->flow_controlled_buffer.length) + diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc index 9ef84e8d475..e16965c4478 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.cc +++ b/src/core/ext/transport/chttp2/transport/frame_data.cc @@ -55,7 +55,7 @@ absl::Status grpc_chttp2_data_parser_begin_frame(uint8_t flags, void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf, uint32_t write_bytes, int is_eof, - grpc_transport_one_way_stats* stats, + grpc_core::CallTracerInterface* call_tracer, grpc_slice_buffer* outbuf) { grpc_slice hdr; uint8_t* p; @@ -77,7 +77,7 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf, grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf); - stats->framing_bytes += header_size; + call_tracer->RecordOutgoingBytes({header_size, 0, 0}); } grpc_core::Poll grpc_deframe_unprocessed_incoming_frames( @@ -126,8 +126,7 @@ grpc_core::Poll grpc_deframe_unprocessed_incoming_frames( if (min_progress_size != nullptr) *min_progress_size = 0; if (stream_out != nullptr) { - s->stats.incoming.framing_bytes += 5; - s->stats.incoming.data_bytes += length; + s->call_tracer_wrapper.RecordIncomingBytes({5, length, 0}); grpc_slice_buffer_move_first_into_buffer(slices, 5, header); grpc_slice_buffer_move_first(slices, length, stream_out->c_slice_buffer()); } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index 1e628d48ddf..eae1f439c3d 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -33,6 +33,7 @@ #include "src/core/lib/promise/poll.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/transport/transport.h" +#include "src/core/telemetry/call_tracer.h" // start processing a new data frame absl::Status grpc_chttp2_data_parser_begin_frame(uint8_t flags, @@ -49,7 +50,7 @@ grpc_error_handle grpc_chttp2_data_parser_parse(void* parser, void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf, uint32_t write_bytes, int is_eof, - grpc_transport_one_way_stats* stats, + grpc_core::CallTracerInterface* call_tracer, grpc_slice_buffer* outbuf); grpc_core::Poll grpc_deframe_unprocessed_incoming_frames( diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc b/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc index f2c7d278e2c..540d4faaf2a 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc @@ -39,11 +39,13 @@ #include "src/core/lib/transport/http2_errors.h" #include "src/core/lib/transport/metadata_batch.h" -grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code, - grpc_transport_one_way_stats* stats) { +grpc_slice grpc_chttp2_rst_stream_create( + uint32_t id, uint32_t code, grpc_core::CallTracerInterface* call_tracer) { static const size_t frame_size = 13; grpc_slice slice = GRPC_SLICE_MALLOC(frame_size); - if (stats != nullptr) stats->framing_bytes += frame_size; + if (call_tracer != nullptr) { + call_tracer->RecordOutgoingBytes({frame_size, 0, 0}); + } uint8_t* p = GRPC_SLICE_START_PTR(slice); // Frame size. @@ -70,10 +72,10 @@ grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code, void grpc_chttp2_add_rst_stream_to_next_write( grpc_chttp2_transport* t, uint32_t id, uint32_t code, - grpc_transport_one_way_stats* stats) { + grpc_core::CallTracerInterface* call_tracer) { t->num_pending_induced_frames++; grpc_slice_buffer_add(&t->qbuf, - grpc_chttp2_rst_stream_create(id, code, stats)); + grpc_chttp2_rst_stream_create(id, code, call_tracer)); } grpc_error_handle grpc_chttp2_rst_stream_parser_begin_frame( @@ -102,7 +104,8 @@ grpc_error_handle grpc_chttp2_rst_stream_parser_parse(void* parser, cur++; p->byte++; } - s->stats.incoming.framing_bytes += static_cast(end - cur); + uint64_t framing_bytes = static_cast(end - cur); + s->call_tracer_wrapper.RecordIncomingBytes({framing_bytes, 0, 0}); if (p->byte == 4) { CHECK(is_last); diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h index 1da9c0e457a..0e8e74b62a2 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h @@ -27,20 +27,22 @@ #include "src/core/ext/transport/chttp2/transport/legacy_frame.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/transport/transport.h" +#include "src/core/telemetry/call_tracer.h" struct grpc_chttp2_rst_stream_parser { uint8_t byte; uint8_t reason_bytes[4]; }; -grpc_slice grpc_chttp2_rst_stream_create(uint32_t stream_id, uint32_t code, - grpc_transport_one_way_stats* stats); +grpc_slice grpc_chttp2_rst_stream_create( + uint32_t stream_id, uint32_t code, + grpc_core::CallTracerInterface* call_tracer); // Adds RST_STREAM frame to t->qbuf (buffer for the next write). Should be // called when we want to add RST_STREAM and we are not in // write_action_begin_locked. void grpc_chttp2_add_rst_stream_to_next_write( grpc_chttp2_transport* t, uint32_t id, uint32_t code, - grpc_transport_one_way_stats* stats); + grpc_core::CallTracerInterface* call_tracer); grpc_error_handle grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_rst_stream_parser* parser, uint32_t length, uint8_t flags); diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.cc b/src/core/ext/transport/chttp2/transport/frame_window_update.cc index f9824f7e035..efae1615eb9 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.cc +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.cc @@ -32,10 +32,13 @@ #include "src/core/ext/transport/chttp2/transport/internal.h" grpc_slice grpc_chttp2_window_update_create( - uint32_t id, uint32_t window_delta, grpc_transport_one_way_stats* stats) { + uint32_t id, uint32_t window_delta, + grpc_core::CallTracerInterface* call_tracer) { static const size_t frame_size = 13; grpc_slice slice = GRPC_SLICE_MALLOC(frame_size); - stats->framing_bytes += frame_size; + if (call_tracer != nullptr) { + call_tracer->RecordOutgoingBytes({frame_size, 0, 0}); + } uint8_t* p = GRPC_SLICE_START_PTR(slice); CHECK(window_delta); @@ -84,7 +87,8 @@ grpc_error_handle grpc_chttp2_window_update_parser_parse( } if (s != nullptr) { - s->stats.incoming.framing_bytes += static_cast(end - cur); + uint64_t framing_bytes = static_cast(end - cur); + s->call_tracer_wrapper.RecordIncomingBytes({framing_bytes, 0, 0}); } if (p->byte == 4) { diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.h b/src/core/ext/transport/chttp2/transport/frame_window_update.h index 92ea1587010..53c5f41d3db 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.h +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.h @@ -27,6 +27,7 @@ #include "src/core/ext/transport/chttp2/transport/legacy_frame.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/transport/transport.h" +#include "src/core/telemetry/call_tracer.h" struct grpc_chttp2_window_update_parser { uint8_t byte; @@ -34,7 +35,8 @@ struct grpc_chttp2_window_update_parser { uint32_t amount; }; grpc_slice grpc_chttp2_window_update_create( - uint32_t id, uint32_t window_delta, grpc_transport_one_way_stats* stats); + uint32_t id, uint32_t window_delta, + grpc_core::CallTracerInterface* call_tracer); grpc_error_handle grpc_chttp2_window_update_parser_begin_frame( grpc_chttp2_window_update_parser* parser, uint32_t length, uint8_t flags); diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc index 3f3acd40aa0..54a0865869d 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc @@ -87,7 +87,7 @@ void HPackCompressor::Frame(const EncodeHeaderOptions& options, if (options.is_end_of_stream) { flags |= GRPC_CHTTP2_DATA_FLAG_END_STREAM; } - options.stats->header_bytes += raw.Length(); + options.call_tracer->RecordOutgoingBytes({0, 0, raw.Length()}); while (frame_type == GRPC_CHTTP2_FRAME_HEADER || raw.Length() > 0) { // per the HTTP/2 spec: // A HEADERS frame without the END_HEADERS flag set MUST be followed by @@ -101,7 +101,7 @@ void HPackCompressor::Frame(const EncodeHeaderOptions& options, } FillHeader(grpc_slice_buffer_tiny_add(output, kHeadersFrameHeaderSize), frame_type, options.stream_id, len, flags); - options.stats->framing_bytes += kHeadersFrameHeaderSize; + options.call_tracer->RecordOutgoingBytes({kHeadersFrameHeaderSize, 0, 0}); grpc_slice_buffer_move_first(raw.c_slice_buffer(), len, output); frame_type = GRPC_CHTTP2_FRAME_CONTINUATION; diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h index 7c99d7b2146..cfbf2426ec9 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h @@ -42,6 +42,7 @@ #include "src/core/lib/transport/metadata_compression_traits.h" #include "src/core/lib/transport/timeout_encoding.h" #include "src/core/lib/transport/transport.h" +#include "src/core/telemetry/call_tracer.h" namespace grpc_core { @@ -353,7 +354,7 @@ class HPackCompressor { bool is_end_of_stream; bool use_true_binary_metadata; size_t max_frame_size; - grpc_transport_one_way_stats* stats; + CallTracerInterface* call_tracer; }; template diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 0623802f1b9..ae306c925de 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -554,6 +554,51 @@ typedef enum { GRPC_METADATA_PUBLISHED_AT_CLOSE } grpc_published_metadata_method; +namespace grpc_core { + +// A CallTracer wrapper that updates both the legacy and new APIs for +// transport byte sizes. +// TODO(ctiller): This can go away as part of removing the +// grpc_transport_stream_stats struct. +class Chttp2CallTracerWrapper final : public CallTracerInterface { + public: + explicit Chttp2CallTracerWrapper(grpc_chttp2_stream* stream) + : stream_(stream) {} + + void RecordIncomingBytes( + const TransportByteSize& transport_byte_size) override; + void RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) override; + + // Everything else is a no-op. + void RecordSendInitialMetadata( + grpc_metadata_batch* /*send_initial_metadata*/) override {} + void RecordSendTrailingMetadata( + grpc_metadata_batch* /*send_trailing_metadata*/) override {} + void RecordSendMessage(const SliceBuffer& /*send_message*/) override {} + void RecordSendCompressedMessage( + const SliceBuffer& /*send_compressed_message*/) override {} + void RecordReceivedInitialMetadata( + grpc_metadata_batch* /*recv_initial_metadata*/) override {} + void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {} + void RecordReceivedDecompressedMessage( + const SliceBuffer& /*recv_decompressed_message*/) override {} + void RecordCancel(grpc_error_handle /*cancel_error*/) override {} + std::shared_ptr StartNewTcpTrace() override { + return nullptr; + } + void RecordAnnotation(absl::string_view /*annotation*/) override {} + void RecordAnnotation(const Annotation& /*annotation*/) override {} + std::string TraceId() override { return ""; } + std::string SpanId() override { return ""; } + bool IsSampled() override { return false; } + + private: + grpc_chttp2_stream* stream_; +}; + +} // namespace grpc_core + struct grpc_chttp2_stream { grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount, const void* server_data, grpc_core::Arena* arena); @@ -653,7 +698,11 @@ struct grpc_chttp2_stream { /// Number of times written int64_t write_counter = 0; + grpc_core::Chttp2CallTracerWrapper call_tracer_wrapper; + /// Only set when enabled. + // TODO(roth): Remove this when the call_tracer_in_transport + // experiment finishes rolling out. grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr; /// Only set when enabled. diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index d341996232b..82c5a188b47 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -555,7 +555,7 @@ static grpc_error_handle init_data_frame_parser(grpc_chttp2_transport* t) { return init_non_header_skip_frame_parser(t); } s->received_bytes += t->incoming_frame_size; - s->stats.incoming.framing_bytes += 9; + s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0}); if (s->read_closed) { return init_non_header_skip_frame_parser(t); } @@ -574,7 +574,7 @@ error_handler: absl_status_to_grpc_error(status)); grpc_chttp2_add_rst_stream_to_next_write(t, t->incoming_stream_id, GRPC_HTTP2_PROTOCOL_ERROR, - &s->stats.outgoing); + &s->call_tracer_wrapper); return init_non_header_skip_frame_parser(t); } else { return absl_status_to_grpc_error(status); @@ -725,7 +725,7 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t, t->incoming_stream = s; } DCHECK_NE(s, nullptr); - s->stats.incoming.framing_bytes += 9; + s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0}); if (GPR_UNLIKELY(s->read_closed)) { GRPC_CHTTP2_IF_TRACING(ERROR) << "skipping already closed grpc_chttp2_stream header"; @@ -796,7 +796,7 @@ static grpc_error_handle init_window_update_frame_parser( } return init_non_header_skip_frame_parser(t); } - s->stats.incoming.framing_bytes += 9; + s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0}); } t->parser = grpc_chttp2_transport::Parser{ "window_update", grpc_chttp2_window_update_parser_parse, @@ -822,7 +822,7 @@ static grpc_error_handle init_rst_stream_parser(grpc_chttp2_transport* t) { if (!t->incoming_stream) { return init_non_header_skip_frame_parser(t); } - s->stats.incoming.framing_bytes += 9; + s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0}); t->parser = grpc_chttp2_transport::Parser{ "rst_stream", grpc_chttp2_rst_stream_parser_parse, &t->simple.rst_stream}; if (!t->is_client && grpc_core::IsRstpitEnabled()) { @@ -918,7 +918,7 @@ static void force_client_rst_stream(void* sp, grpc_error_handle /*error*/) { grpc_chttp2_transport* t = s->t.get(); if (!s->write_closed) { grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, - &s->stats.outgoing); + &s->call_tracer_wrapper); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); grpc_chttp2_mark_stream_closed(t, s, true, true, absl::OkStatus()); } @@ -933,9 +933,12 @@ grpc_error_handle grpc_chttp2_header_parser_parse(void* hpack_parser, auto* parser = static_cast(hpack_parser); grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr; if (s != nullptr) { - s->stats.incoming.header_bytes += GRPC_SLICE_LENGTH(slice); + s->call_tracer_wrapper.RecordIncomingBytes( + {0, 0, GRPC_SLICE_LENGTH(slice)}); call_tracer = - s->arena->GetContext(); + grpc_core::IsCallTracerInTransportEnabled() + ? s->arena->GetContext() + : s->arena->GetContext(); } grpc_error_handle error = parser->Parse( slice, is_last != 0, absl::BitGenRef(t->bitgen), call_tracer); diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index a0c0db88a77..2f423069b63 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -296,10 +296,9 @@ class WriteContext { uint32_t transport_announce = t_->flow_control.MaybeSendUpdate( t_->outbuf.c_slice_buffer()->count > 0); if (transport_announce) { - grpc_transport_one_way_stats throwaway_stats; - grpc_slice_buffer_add(t_->outbuf.c_slice_buffer(), - grpc_chttp2_window_update_create( - 0, transport_announce, &throwaway_stats)); + grpc_slice_buffer_add( + t_->outbuf.c_slice_buffer(), + grpc_chttp2_window_update_create(0, transport_announce, nullptr)); grpc_chttp2_reset_ping_clock(t_); } } @@ -411,7 +410,7 @@ class DataSendContext { s_->send_trailing_metadata != nullptr && s_->send_trailing_metadata->empty(); grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes, - is_last_frame_, &s_->stats.outgoing, + is_last_frame_, &s_->call_tracer_wrapper, t_->outbuf.c_slice_buffer()); sfc_upd_.SentData(send_bytes); s_->sending_bytes += send_bytes; @@ -470,8 +469,7 @@ class StreamWriteContext { t_->settings.peer() .allow_true_binary_metadata(), // use_true_binary_metadata t_->settings.peer().max_frame_size(), // max_frame_size - &s_->stats.outgoing // stats - }, + &s_->call_tracer_wrapper}, *s_->send_initial_metadata, t_->outbuf.c_slice_buffer()); grpc_chttp2_reset_ping_clock(t_); write_context_->IncInitialMetadataWrites(); @@ -483,16 +481,32 @@ class StreamWriteContext { grpc_chttp2_complete_closure_step(t_, &s_->send_initial_metadata_finished, absl::OkStatus(), "send_initial_metadata_finished"); - if (s_->call_tracer) { - grpc_core::HttpAnnotation::WriteStats write_stats; - write_stats.target_write_size = write_context_->target_write_size(); - s_->call_tracer->RecordAnnotation( - grpc_core::HttpAnnotation( - grpc_core::HttpAnnotation::Type::kHeadWritten, - gpr_now(GPR_CLOCK_REALTIME)) - .Add(s_->t->flow_control.stats()) - .Add(s_->flow_control.stats()) - .Add(write_stats)); + if (!grpc_core::IsCallTracerInTransportEnabled()) { + if (s_->call_tracer) { + grpc_core::HttpAnnotation::WriteStats write_stats; + write_stats.target_write_size = write_context_->target_write_size(); + s_->call_tracer->RecordAnnotation( + grpc_core::HttpAnnotation( + grpc_core::HttpAnnotation::Type::kHeadWritten, + gpr_now(GPR_CLOCK_REALTIME)) + .Add(s_->t->flow_control.stats()) + .Add(s_->flow_control.stats()) + .Add(write_stats)); + } + } else if (grpc_core::IsTraceRecordCallopsEnabled()) { + auto* call_tracer = + s_->arena->GetContext(); + if (call_tracer != nullptr && call_tracer->IsSampled()) { + grpc_core::HttpAnnotation::WriteStats write_stats; + write_stats.target_write_size = write_context_->target_write_size(); + call_tracer->RecordAnnotation( + grpc_core::HttpAnnotation( + grpc_core::HttpAnnotation::Type::kHeadWritten, + gpr_now(GPR_CLOCK_REALTIME)) + .Add(s_->t->flow_control.stats()) + .Add(s_->flow_control.stats()) + .Add(write_stats)); + } } } @@ -503,9 +517,10 @@ class StreamWriteContext { const uint32_t stream_announce = s_->flow_control.MaybeSendUpdate(); if (stream_announce == 0) return; - grpc_slice_buffer_add(t_->outbuf.c_slice_buffer(), - grpc_chttp2_window_update_create( - s_->id, stream_announce, &s_->stats.outgoing)); + grpc_slice_buffer_add( + t_->outbuf.c_slice_buffer(), + grpc_chttp2_window_update_create(s_->id, stream_announce, + &s_->call_tracer_wrapper)); grpc_chttp2_reset_ping_clock(t_); write_context_->IncWindowUpdateWrites(); } @@ -558,12 +573,13 @@ class StreamWriteContext { GRPC_CHTTP2_IF_TRACING(INFO) << "sending trailing_metadata"; if (s_->send_trailing_metadata->empty()) { grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true, - &s_->stats.outgoing, t_->outbuf.c_slice_buffer()); + &s_->call_tracer_wrapper, + t_->outbuf.c_slice_buffer()); } else { t_->hpack_compressor.EncodeHeaders( grpc_core::HPackCompressor::EncodeHeaderOptions{ s_->id, true, t_->settings.peer().allow_true_binary_metadata(), - t_->settings.peer().max_frame_size(), &s_->stats.outgoing}, + t_->settings.peer().max_frame_size(), &s_->call_tracer_wrapper}, *s_->send_trailing_metadata, t_->outbuf.c_slice_buffer()); } write_context_->IncTrailingMetadataWrites(); @@ -627,16 +643,28 @@ class StreamWriteContext { grpc_slice_buffer_add( t_->outbuf.c_slice_buffer(), grpc_chttp2_rst_stream_create(s_->id, GRPC_HTTP2_NO_ERROR, - &s_->stats.outgoing)); + &s_->call_tracer_wrapper)); } grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true, absl::OkStatus()); - if (s_->call_tracer) { - s_->call_tracer->RecordAnnotation( - grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd, - gpr_now(GPR_CLOCK_REALTIME)) - .Add(s_->t->flow_control.stats()) - .Add(s_->flow_control.stats())); + if (!grpc_core::IsCallTracerInTransportEnabled()) { + if (s_->call_tracer) { + s_->call_tracer->RecordAnnotation( + grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd, + gpr_now(GPR_CLOCK_REALTIME)) + .Add(s_->t->flow_control.stats()) + .Add(s_->flow_control.stats())); + } + } else if (grpc_core::IsTraceRecordCallopsEnabled()) { + auto* call_tracer = + s_->arena->GetContext(); + if (call_tracer != nullptr && call_tracer->IsSampled()) { + call_tracer->RecordAnnotation( + grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd, + gpr_now(GPR_CLOCK_REALTIME)) + .Add(s_->t->flow_control.stats()) + .Add(s_->flow_control.stats())); + } } } diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 7eb9a20c504..d076b175aa0 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -114,7 +114,7 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_call_status_override_on_cancellation, nullptr, 0, true, true}, {"call_tracer_in_transport", description_call_tracer_in_transport, - additional_constraints_call_tracer_in_transport, nullptr, 0, false, true}, + additional_constraints_call_tracer_in_transport, nullptr, 0, true, true}, {"canary_client_privacy", description_canary_client_privacy, additional_constraints_canary_client_privacy, nullptr, 0, false, false}, {"client_privacy", description_client_privacy, @@ -264,7 +264,7 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_call_status_override_on_cancellation, nullptr, 0, true, true}, {"call_tracer_in_transport", description_call_tracer_in_transport, - additional_constraints_call_tracer_in_transport, nullptr, 0, false, true}, + additional_constraints_call_tracer_in_transport, nullptr, 0, true, true}, {"canary_client_privacy", description_canary_client_privacy, additional_constraints_canary_client_privacy, nullptr, 0, false, false}, {"client_privacy", description_client_privacy, @@ -414,7 +414,7 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_call_status_override_on_cancellation, nullptr, 0, true, true}, {"call_tracer_in_transport", description_call_tracer_in_transport, - additional_constraints_call_tracer_in_transport, nullptr, 0, false, true}, + additional_constraints_call_tracer_in_transport, nullptr, 0, true, true}, {"canary_client_privacy", description_canary_client_privacy, additional_constraints_canary_client_privacy, nullptr, 0, false, false}, {"client_privacy", description_client_privacy, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 88c396e64c1..c5e62a41011 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -59,7 +59,8 @@ namespace grpc_core { #if defined(GRPC_CFSTREAM) #define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; } -inline bool IsCallTracerInTransportEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_TRACER_IN_TRANSPORT +inline bool IsCallTracerInTransportEnabled() { return true; } inline bool IsCanaryClientPrivacyEnabled() { return false; } inline bool IsClientPrivacyEnabled() { return false; } inline bool IsEventEngineClientEnabled() { return false; } @@ -89,7 +90,8 @@ inline bool IsWorkSerializerDispatchEnabled() { return false; } #elif defined(GPR_WINDOWS) #define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; } -inline bool IsCallTracerInTransportEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_TRACER_IN_TRANSPORT +inline bool IsCallTracerInTransportEnabled() { return true; } inline bool IsCanaryClientPrivacyEnabled() { return false; } inline bool IsClientPrivacyEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_EVENT_ENGINE_CLIENT @@ -122,7 +124,8 @@ inline bool IsWorkSerializerDispatchEnabled() { return false; } #else #define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; } -inline bool IsCallTracerInTransportEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_TRACER_IN_TRANSPORT +inline bool IsCallTracerInTransportEnabled() { return true; } inline bool IsCanaryClientPrivacyEnabled() { return false; } inline bool IsClientPrivacyEnabled() { return false; } inline bool IsEventEngineClientEnabled() { return false; } diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index 0566ffbab7e..df0c288affe 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -42,6 +42,8 @@ - name: call_status_override_on_cancellation default: true +- name: call_tracer_in_transport + default: true - name: call_v3 default: false - name: canary_client_privacy diff --git a/src/core/telemetry/call_tracer.cc b/src/core/telemetry/call_tracer.cc index 4b70f220395..f09721a0622 100644 --- a/src/core/telemetry/call_tracer.cc +++ b/src/core/telemetry/call_tracer.cc @@ -32,6 +32,15 @@ namespace grpc_core { +CallTracerInterface::TransportByteSize& +CallTracerInterface::TransportByteSize::operator+=( + const CallTracerInterface::TransportByteSize& other) { + framing_bytes += other.framing_bytes; + data_bytes += other.data_bytes; + header_bytes += other.header_bytes; + return *this; +} + // // ServerCallTracerFactory // @@ -42,6 +51,7 @@ ServerCallTracerFactory* g_server_call_tracer_factory_ = nullptr; const char* kServerCallTracerFactoryChannelArgName = "grpc.experimental.server_call_tracer_factory"; + } // namespace ServerCallTracerFactory* ServerCallTracerFactory::Get( @@ -139,6 +149,18 @@ class DelegatingClientCallTracer : public ClientCallTracer { tracer->RecordEnd(latency); } } + void RecordIncomingBytes( + const TransportByteSize& transport_byte_size) override { + for (auto* tracer : tracers_) { + tracer->RecordIncomingBytes(transport_byte_size); + } + } + void RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) override { + for (auto* tracer : tracers_) { + tracer->RecordOutgoingBytes(transport_byte_size); + } + } void RecordAnnotation(absl::string_view annotation) override { for (auto* tracer : tracers_) { tracer->RecordAnnotation(annotation); @@ -271,6 +293,18 @@ class DelegatingServerCallTracer : public ServerCallTracer { tracer->RecordEnd(final_info); } } + void RecordIncomingBytes( + const TransportByteSize& transport_byte_size) override { + for (auto* tracer : tracers_) { + tracer->RecordIncomingBytes(transport_byte_size); + } + } + void RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) override { + for (auto* tracer : tracers_) { + tracer->RecordOutgoingBytes(transport_byte_size); + } + } void RecordAnnotation(absl::string_view annotation) override { for (auto* tracer : tracers_) { tracer->RecordAnnotation(annotation); diff --git a/src/core/telemetry/call_tracer.h b/src/core/telemetry/call_tracer.h index 3f8e25d75b5..d822768d7e6 100644 --- a/src/core/telemetry/call_tracer.h +++ b/src/core/telemetry/call_tracer.h @@ -110,6 +110,19 @@ class CallTracerInterface : public CallTracerAnnotationInterface { virtual void RecordReceivedDecompressedMessage( const SliceBuffer& recv_decompressed_message) = 0; virtual void RecordCancel(grpc_error_handle cancel_error) = 0; + + struct TransportByteSize { + uint64_t framing_bytes = 0; + uint64_t data_bytes = 0; + uint64_t header_bytes = 0; + + TransportByteSize& operator+=(const TransportByteSize& other); + }; + virtual void RecordIncomingBytes( + const TransportByteSize& transport_byte_size) = 0; + virtual void RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) = 0; + // Traces a new TCP transport attempt for this call attempt. Note the TCP // transport may finish tracing and unref the TCP tracer before or after the // call completion in gRPC core. No TCP tracing when null is returned. @@ -144,6 +157,8 @@ class ClientCallTracer : public CallTracerAnnotationInterface { // will be null. virtual void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, + // TODO(roth): Remove this argument when the + // call_tracer_in_transport experiment finishes rolling out. const grpc_transport_stream_stats* transport_stream_stats) = 0; // Should be the last API call to the object. Once invoked, the tracer // library is free to destroy the object. diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc index 1f15fd4d70f..daffd3e78a2 100644 --- a/src/cpp/ext/filters/census/client_filter.cc +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -53,6 +53,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/resource_quota/arena.h" @@ -214,17 +215,18 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_)); tags.emplace_back(ClientStatusTagKey(), absl::StatusCodeToString(status_code_)); + uint64_t outgoing_bytes = 0; + uint64_t incoming_bytes = 0; + if (grpc_core::IsCallTracerInTransportEnabled()) { + outgoing_bytes = outgoing_bytes_.load(); + incoming_bytes = incoming_bytes_.load(); + } else if (transport_stream_stats != nullptr) { + outgoing_bytes = transport_stream_stats->outgoing.data_bytes; + incoming_bytes = transport_stream_stats->incoming.data_bytes; + } ::opencensus::stats::Record( - // TODO(yashykt): Recording zeros here when transport_stream_stats is - // nullptr is unfortunate and should be fixed. - {{RpcClientSentBytesPerRpc(), - static_cast(transport_stream_stats != nullptr - ? transport_stream_stats->outgoing.data_bytes - : 0)}, - {RpcClientReceivedBytesPerRpc(), - static_cast(transport_stream_stats != nullptr - ? transport_stream_stats->incoming.data_bytes - : 0)}, + {{RpcClientSentBytesPerRpc(), static_cast(outgoing_bytes)}, + {RpcClientReceivedBytesPerRpc(), static_cast(incoming_bytes)}, {RpcClientServerLatency(), ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time))}, {RpcClientRoundtripLatency(), @@ -233,6 +235,16 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: } } +void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordIncomingBytes( + const TransportByteSize& transport_byte_size) { + incoming_bytes_.fetch_add(transport_byte_size.data_bytes); +} + +void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) { + outgoing_bytes_.fetch_add(transport_byte_size.data_bytes); +} + void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel( absl::Status /*cancel_error*/) {} diff --git a/src/cpp/ext/filters/census/open_census_call_tracer.h b/src/cpp/ext/filters/census/open_census_call_tracer.h index 221a551a8c3..15575be5bb0 100644 --- a/src/cpp/ext/filters/census/open_census_call_tracer.h +++ b/src/cpp/ext/filters/census/open_census_call_tracer.h @@ -21,6 +21,7 @@ #include +#include #include #include @@ -96,6 +97,10 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer { void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) override; + void RecordIncomingBytes( + const TransportByteSize& transport_byte_size) override; + void RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) override; void RecordCancel(grpc_error_handle cancel_error) override; void RecordEnd(const gpr_timespec& /*latency*/) override; void RecordAnnotation(absl::string_view annotation) override; @@ -121,6 +126,12 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer { uint64_t sent_message_count_ = 0; // End status code absl::StatusCode status_code_; + // TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated + // to promises, after which we can ensure that the transport invokes + // the RecordIncomingBytes() and RecordOutgoingBytes() methods inside + // the call's party. + std::atomic incoming_bytes_{0}; + std::atomic outgoing_bytes_{0}; }; explicit OpenCensusCallTracer(grpc_core::Slice path, grpc_core::Arena* arena, diff --git a/src/cpp/ext/filters/census/server_call_tracer.cc b/src/cpp/ext/filters/census/server_call_tracer.cc index 24c6e80fb98..e98b21ec0bc 100644 --- a/src/cpp/ext/filters/census/server_call_tracer.cc +++ b/src/cpp/ext/filters/census/server_call_tracer.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -154,6 +155,11 @@ class OpenCensusServerCallTracer : public grpc_core::ServerCallTracer { void RecordEnd(const grpc_call_final_info* final_info) override; + void RecordIncomingBytes( + const TransportByteSize& transport_byte_size) override; + void RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) override; + void RecordAnnotation(absl::string_view annotation) override { if (!context_.Span().IsRecording()) { return; @@ -193,6 +199,12 @@ class OpenCensusServerCallTracer : public grpc_core::ServerCallTracer { // Buffer needed for grpc_slice to reference it when adding metatdata to // response. char stats_buf_[kMaxServerStatsLen]; + // TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated + // to promises, after which we can ensure that the transport invokes + // the RecordIncomingBytes() and RecordOutgoingBytes() methods inside + // the call's party. + std::atomic incoming_bytes_{0}; + std::atomic outgoing_bytes_{0}; }; void OpenCensusServerCallTracer::RecordReceivedInitialMetadata( @@ -236,8 +248,18 @@ void OpenCensusServerCallTracer::RecordSendTrailingMetadata( void OpenCensusServerCallTracer::RecordEnd( const grpc_call_final_info* final_info) { if (OpenCensusStatsEnabled()) { - const uint64_t request_size = GetOutgoingDataSize(final_info); - const uint64_t response_size = GetIncomingDataSize(final_info); + uint64_t outgoing_bytes; + uint64_t incoming_bytes; + if (grpc_core::IsCallTracerInTransportEnabled()) { + outgoing_bytes = outgoing_bytes_.load(); + incoming_bytes = incoming_bytes_.load(); + } else { + // Note: We are incorrectly swapping the two values here, which is + // a pre-existing bug. This code will go away as part of the + // experiment rollout. + outgoing_bytes = GetIncomingDataSize(final_info); + incoming_bytes = GetOutgoingDataSize(final_info); + } double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_); std::vector> tags = context_.tags().tags(); @@ -246,8 +268,8 @@ void OpenCensusServerCallTracer::RecordEnd( ServerStatusTagKey(), std::string(StatusCodeToString(final_info->final_status))); ::opencensus::stats::Record( - {{RpcServerSentBytesPerRpc(), static_cast(response_size)}, - {RpcServerReceivedBytesPerRpc(), static_cast(request_size)}, + {{RpcServerSentBytesPerRpc(), static_cast(outgoing_bytes)}, + {RpcServerReceivedBytesPerRpc(), static_cast(incoming_bytes)}, {RpcServerServerLatency(), elapsed_time_ms}, {RpcServerSentMessagesPerRpc(), sent_message_count_}, {RpcServerReceivedMessagesPerRpc(), recv_message_count_}}, @@ -258,6 +280,16 @@ void OpenCensusServerCallTracer::RecordEnd( } } +void OpenCensusServerCallTracer::RecordIncomingBytes( + const TransportByteSize& transport_byte_size) { + incoming_bytes_.fetch_add(transport_byte_size.data_bytes); +} + +void OpenCensusServerCallTracer::RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) { + outgoing_bytes_.fetch_add(transport_byte_size.data_bytes); +} + // // OpenCensusServerCallTracerFactory // diff --git a/src/cpp/ext/otel/BUILD b/src/cpp/ext/otel/BUILD index b6f82fbfb19..4546c2b1306 100644 --- a/src/cpp/ext/otel/BUILD +++ b/src/cpp/ext/otel/BUILD @@ -76,6 +76,7 @@ grpc_cc_library( "//src/core:channel_stack_type", "//src/core:context", "//src/core:error", + "//src/core:experiments", "//src/core:match", "//src/core:metadata_batch", "//src/core:metrics", diff --git a/src/cpp/ext/otel/otel_client_call_tracer.cc b/src/cpp/ext/otel/otel_client_call_tracer.cc index 80202def13e..822a96ea9cf 100644 --- a/src/cpp/ext/otel/otel_client_call_tracer.cc +++ b/src/cpp/ext/otel/otel_client_call_tracer.cc @@ -47,6 +47,7 @@ #include "src/core/client_channel/client_channel_filter.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/status_util.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/resource_quota/arena.h" @@ -162,24 +163,37 @@ void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: absl::ToDoubleSeconds(absl::Now() - start_time_), labels, opentelemetry::context::Context{}); } + uint64_t outgoing_bytes = 0; + uint64_t incoming_bytes = 0; + if (grpc_core::IsCallTracerInTransportEnabled()) { + outgoing_bytes = outgoing_bytes_.load(); + incoming_bytes = incoming_bytes_.load(); + } else if (transport_stream_stats != nullptr) { + outgoing_bytes = transport_stream_stats->outgoing.data_bytes; + incoming_bytes = transport_stream_stats->incoming.data_bytes; + } if (parent_->otel_plugin_->client_.attempt .sent_total_compressed_message_size != nullptr) { parent_->otel_plugin_->client_.attempt.sent_total_compressed_message_size - ->Record(transport_stream_stats != nullptr - ? transport_stream_stats->outgoing.data_bytes - : 0, - labels, opentelemetry::context::Context{}); + ->Record(outgoing_bytes, labels, opentelemetry::context::Context{}); } if (parent_->otel_plugin_->client_.attempt .rcvd_total_compressed_message_size != nullptr) { parent_->otel_plugin_->client_.attempt.rcvd_total_compressed_message_size - ->Record(transport_stream_stats != nullptr - ? transport_stream_stats->incoming.data_bytes - : 0, - labels, opentelemetry::context::Context{}); + ->Record(incoming_bytes, labels, opentelemetry::context::Context{}); } } +void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: + RecordIncomingBytes(const TransportByteSize& transport_byte_size) { + incoming_bytes_.fetch_add(transport_byte_size.data_bytes); +} + +void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: + RecordOutgoingBytes(const TransportByteSize& transport_byte_size) { + outgoing_bytes_.fetch_add(transport_byte_size.data_bytes); +} + void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::RecordCancel( absl::Status /*cancel_error*/) {} diff --git a/src/cpp/ext/otel/otel_client_call_tracer.h b/src/cpp/ext/otel/otel_client_call_tracer.h index 99a5f1c4602..ed435311b20 100644 --- a/src/cpp/ext/otel/otel_client_call_tracer.h +++ b/src/cpp/ext/otel/otel_client_call_tracer.h @@ -84,8 +84,12 @@ class OpenTelemetryPluginImpl::ClientCallTracer void RecordReceivedDecompressedMessage( const grpc_core::SliceBuffer& recv_decompressed_message) override; void RecordReceivedTrailingMetadata( - absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/, + absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) override; + void RecordIncomingBytes( + const TransportByteSize& transport_byte_size) override; + void RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) override; void RecordCancel(grpc_error_handle cancel_error) override; void RecordEnd(const gpr_timespec& /*latency*/) override; void RecordAnnotation(absl::string_view /*annotation*/) override; @@ -109,6 +113,12 @@ class OpenTelemetryPluginImpl::ClientCallTracer std::vector> injected_labels_from_plugin_options_; bool is_trailers_only_ = false; + // TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated + // to promises, after which we can ensure that the transport invokes + // the RecordIncomingBytes() and RecordOutgoingBytes() methods inside + // the call's party. + std::atomic incoming_bytes_{0}; + std::atomic outgoing_bytes_{0}; }; ClientCallTracer( diff --git a/src/cpp/ext/otel/otel_server_call_tracer.cc b/src/cpp/ext/otel/otel_server_call_tracer.cc index 28c7c6d7a7b..6fd4785672d 100644 --- a/src/cpp/ext/otel/otel_server_call_tracer.cc +++ b/src/cpp/ext/otel/otel_server_call_tracer.cc @@ -37,6 +37,7 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/status_util.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" @@ -123,16 +124,30 @@ void OpenTelemetryPluginImpl::ServerCallTracer::RecordEnd( if (otel_plugin_->server_.call.sent_total_compressed_message_size != nullptr) { otel_plugin_->server_.call.sent_total_compressed_message_size->Record( - final_info->stats.transport_stream_stats.outgoing.data_bytes, labels, - opentelemetry::context::Context{}); + grpc_core::IsCallTracerInTransportEnabled() + ? outgoing_bytes_.load() + : final_info->stats.transport_stream_stats.outgoing.data_bytes, + labels, opentelemetry::context::Context{}); } if (otel_plugin_->server_.call.rcvd_total_compressed_message_size != nullptr) { otel_plugin_->server_.call.rcvd_total_compressed_message_size->Record( - final_info->stats.transport_stream_stats.incoming.data_bytes, labels, - opentelemetry::context::Context{}); + grpc_core::IsCallTracerInTransportEnabled() + ? incoming_bytes_.load() + : final_info->stats.transport_stream_stats.incoming.data_bytes, + labels, opentelemetry::context::Context{}); } } +void OpenTelemetryPluginImpl::ServerCallTracer::RecordIncomingBytes( + const TransportByteSize& transport_byte_size) { + incoming_bytes_.fetch_add(transport_byte_size.data_bytes); +} + +void OpenTelemetryPluginImpl::ServerCallTracer::RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) { + outgoing_bytes_.fetch_add(transport_byte_size.data_bytes); +} + } // namespace internal } // namespace grpc diff --git a/src/cpp/ext/otel/otel_server_call_tracer.h b/src/cpp/ext/otel/otel_server_call_tracer.h index 6d7a7ada915..9d13d669b6d 100644 --- a/src/cpp/ext/otel/otel_server_call_tracer.h +++ b/src/cpp/ext/otel/otel_server_call_tracer.h @@ -100,6 +100,11 @@ class OpenTelemetryPluginImpl::ServerCallTracer void RecordEnd(const grpc_call_final_info* final_info) override; + void RecordIncomingBytes( + const TransportByteSize& transport_byte_size) override; + void RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) override; + void RecordAnnotation(absl::string_view /*annotation*/) override { // Not implemented } @@ -131,6 +136,12 @@ class OpenTelemetryPluginImpl::ServerCallTracer injected_labels_from_plugin_options_; OpenTelemetryPluginImpl* otel_plugin_; std::shared_ptr scope_config_; + // TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated + // to promises, after which we can ensure that the transport invokes + // the RecordIncomingBytes() and RecordOutgoingBytes() methods inside + // the call's party. + std::atomic incoming_bytes_{0}; + std::atomic outgoing_bytes_{0}; }; } // namespace internal diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc index 9d9ca694be1..496f678f04e 100644 --- a/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc @@ -28,6 +28,7 @@ #include +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/slice/slice.h" namespace grpc_observability { @@ -273,20 +274,23 @@ void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: for (const auto& label : labels_from_peer_) { context_.Labels().emplace_back(label); } - RecordDoubleMetric( - kRpcClientSentBytesPerRpcMeasureName, - static_cast(transport_stream_stats != nullptr - ? transport_stream_stats->outgoing.data_bytes - : 0), - context_.Labels(), parent_->identifier_, parent_->registered_method_, - /*include_exchange_labels=*/true); - RecordDoubleMetric( - kRpcClientReceivedBytesPerRpcMeasureName, - static_cast(transport_stream_stats != nullptr - ? transport_stream_stats->incoming.data_bytes - : 0), - context_.Labels(), parent_->identifier_, parent_->registered_method_, - /*include_exchange_labels=*/true); + uint64_t incoming_bytes = 0; + uint64_t outgoing_bytes = 0; + if (grpc_core::IsCallTracerInTransportEnabled()) { + incoming_bytes = incoming_bytes_.load(); + outgoing_bytes = outgoing_bytes_.load(); + } else if (transport_stream_stats != nullptr) { + incoming_bytes = transport_stream_stats->incoming.data_bytes; + outgoing_bytes = transport_stream_stats->outgoing.data_bytes; + } + RecordDoubleMetric(kRpcClientSentBytesPerRpcMeasureName, + static_cast(outgoing_bytes), context_.Labels(), + parent_->identifier_, parent_->registered_method_, + /*include_exchange_labels=*/true); + RecordDoubleMetric(kRpcClientReceivedBytesPerRpcMeasureName, + static_cast(incoming_bytes), context_.Labels(), + parent_->identifier_, parent_->registered_method_, + /*include_exchange_labels=*/true); RecordDoubleMetric(kRpcClientServerLatencyMeasureName, absl::ToDoubleSeconds(absl::Nanoseconds(elapsed_time)), context_.Labels(), parent_->identifier_, @@ -302,6 +306,16 @@ void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: /*include_exchange_labels=*/true); } +void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: + RecordIncomingBytes(const TransportByteSize& transport_byte_size) { + incoming_bytes_.fetch_add(transport_byte_size.data_bytes); +} + +void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: + RecordOutgoingBytes(const TransportByteSize& transport_byte_size) { + outgoing_bytes_.fetch_add(transport_byte_size.data_bytes); +} + void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: RecordCancel(absl::Status /*cancel_error*/) {} diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.h b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h index fa32340eea0..8c251268fb8 100644 --- a/src/python/grpcio_observability/grpc_observability/client_call_tracer.h +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h @@ -17,6 +17,7 @@ #include +#include #include #include "absl/base/thread_annotations.h" @@ -69,6 +70,10 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer { void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) override; + void RecordIncomingBytes( + const TransportByteSize& transport_byte_size) override; + void RecordOutgoingBytes( + const TransportByteSize& transport_byte_size) override; void RecordCancel(grpc_error_handle cancel_error) override; void RecordEnd(const gpr_timespec& /*latency*/) override; void RecordAnnotation(absl::string_view annotation) override; @@ -97,6 +102,12 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer { optional_labels_array_; std::vector