[CallTracer] report transport byte counts directly to `CallTracer`

Instead of passing the transport byte counts back up through the filter
stack to be reported to the `CallTracer`, we now have the transport
pass the transport byte counts directly to the `CallTracer` itself.
This will eventually allow us to avoid unnecessarily storing these byte
counts in cases where no `CallTracer` actually cares about the data, which
will reduce per-call memory.  (In the short term, it actually increases
memory usage, but we can separately do some work to avoid the memory
usage in the transport by removing the `grpc_transport_stream_stats`
struct from the legacy filter API.)

This is a prereq for supporting `CallTracer` in the new call v3 stack,
which does not include the transport byte counts as part of the
receieve-trailing-metadata hook, unlike the legacy filter stack.

This change is controlled by the `call_tracer_in_transport` experiment,
which is enabled by default.

As part of this experiment, we also fix a couple of related bugs:
- On the client side, the chttp2 transport was incorrectly adding
  annotations to the parent `ClientCallTracer` instead of the
  `CallAttemptTracer`.
- The OpenCensus `ServerCallTracer` was incorrectly swapping the values
  of sent and received bytes.

PiperOrigin-RevId: 650728181
pull/37035/head
Mark D. Roth 5 months ago committed by Copybara-Service
parent 4f1e905e0e
commit 952d6276b4
  1. 2
      BUILD
  2. 2
      requirements.bazel.txt
  3. 54
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  4. 7
      src/core/ext/transport/chttp2/transport/frame_data.cc
  5. 3
      src/core/ext/transport/chttp2/transport/frame_data.h
  6. 15
      src/core/ext/transport/chttp2/transport/frame_rst_stream.cc
  7. 8
      src/core/ext/transport/chttp2/transport/frame_rst_stream.h
  8. 10
      src/core/ext/transport/chttp2/transport/frame_window_update.cc
  9. 4
      src/core/ext/transport/chttp2/transport/frame_window_update.h
  10. 4
      src/core/ext/transport/chttp2/transport/hpack_encoder.cc
  11. 3
      src/core/ext/transport/chttp2/transport/hpack_encoder.h
  12. 49
      src/core/ext/transport/chttp2/transport/internal.h
  13. 19
      src/core/ext/transport/chttp2/transport/parsing.cc
  14. 54
      src/core/ext/transport/chttp2/transport/writing.cc
  15. 6
      src/core/lib/experiments/experiments.cc
  16. 9
      src/core/lib/experiments/experiments.h
  17. 2
      src/core/lib/experiments/rollouts.yaml
  18. 34
      src/core/telemetry/call_tracer.cc
  19. 15
      src/core/telemetry/call_tracer.h
  20. 32
      src/cpp/ext/filters/census/client_filter.cc
  21. 11
      src/cpp/ext/filters/census/open_census_call_tracer.h
  22. 40
      src/cpp/ext/filters/census/server_call_tracer.cc
  23. 1
      src/cpp/ext/otel/BUILD
  24. 30
      src/cpp/ext/otel/otel_client_call_tracer.cc
  25. 12
      src/cpp/ext/otel/otel_client_call_tracer.h
  26. 23
      src/cpp/ext/otel/otel_server_call_tracer.cc
  27. 11
      src/cpp/ext/otel/otel_server_call_tracer.h
  28. 38
      src/python/grpcio_observability/grpc_observability/client_call_tracer.cc
  29. 11
      src/python/grpcio_observability/grpc_observability/client_call_tracer.h
  30. 26
      src/python/grpcio_observability/grpc_observability/server_call_tracer.cc
  31. 14
      src/python/grpcio_observability/grpc_observability/server_call_tracer.h
  32. 6
      src/python/grpcio_observability/make_grpcio_observability.py
  33. 2
      src/python/grpcio_observability/observability_lib_deps.py
  34. 127
      test/core/end2end/tests/http2_stats.cc
  35. 8
      test/core/test_util/fake_stats_plugin.h
  36. 53
      test/core/transport/chttp2/hpack_encoder_test.cc
  37. 14
      test/cpp/ext/filters/census/stats_plugin_end2end_test.cc
  38. 48
      test/cpp/microbenchmarks/bm_chttp2_hpack.cc

@ -2916,6 +2916,7 @@ grpc_cc_library(
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:error",
"//src/core:experiments",
"//src/core:logging_filter",
"//src/core:metadata_batch",
"//src/core:slice",
@ -4686,6 +4687,7 @@ grpc_cc_library(
"absl/strings",
],
deps = [
"call_tracer",
"chttp2_bin_encoder",
"chttp2_legacy_frame",
"chttp2_varint",

@ -7,7 +7,7 @@ oauth2client==4.1.0
requests==2.25.1
urllib3==1.26.18
chardet==3.0.4
certifi==2024.7.4
certifi==2023.7.22
idna==2.7
gevent==22.08.0
zope.event==4.5.0

@ -257,6 +257,7 @@ grpc_core::CopyContextFn g_get_copied_context_fn = nullptr;
namespace grpc_core {
namespace {
// Initialize a grpc_closure \a c to call \a Fn with \a t and \a error. Holds
// the passed in reference to \a t until it's moved into Fn.
template <void (*Fn)(RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle)>
@ -272,13 +273,12 @@ grpc_closure* InitTransportClosure(RefCountedPtr<grpc_chttp2_transport> t,
t.release(), nullptr);
return c;
}
} // namespace
namespace {
TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr;
TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback =
nullptr;
bool test_only_disable_transient_failure_state_notification = false;
} // namespace
void TestOnlySetGlobalHttp2TransportInitCallback(
@ -361,6 +361,34 @@ std::string HttpAnnotation::ToString() const {
return s;
}
void Chttp2CallTracerWrapper::RecordIncomingBytes(
const CallTracerInterface::TransportByteSize& transport_byte_size) {
// Update legacy API.
stream_->stats.incoming.framing_bytes += transport_byte_size.framing_bytes;
stream_->stats.incoming.data_bytes += transport_byte_size.data_bytes;
stream_->stats.incoming.header_bytes += transport_byte_size.header_bytes;
// Update new API.
if (!IsCallTracerInTransportEnabled()) return;
auto* call_tracer = stream_->arena->GetContext<CallTracerInterface>();
if (call_tracer != nullptr) {
call_tracer->RecordIncomingBytes(transport_byte_size);
}
}
void Chttp2CallTracerWrapper::RecordOutgoingBytes(
const CallTracerInterface::TransportByteSize& transport_byte_size) {
// Update legacy API.
stream_->stats.outgoing.framing_bytes += transport_byte_size.framing_bytes;
stream_->stats.outgoing.data_bytes += transport_byte_size.data_bytes;
stream_->stats.outgoing.header_bytes +=
transport_byte_size.header_bytes; // Update new API.
if (!IsCallTracerInTransportEnabled()) return;
auto* call_tracer = stream_->arena->GetContext<CallTracerInterface>();
if (call_tracer != nullptr) {
call_tracer->RecordOutgoingBytes(transport_byte_size);
}
}
} // namespace grpc_core
//
@ -814,7 +842,8 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
return refcount;
}()),
arena(arena),
flow_control(&t->flow_control) {
flow_control(&t->flow_control),
call_tracer_wrapper(this) {
t->streams_allocated.fetch_add(1, std::memory_order_relaxed);
if (server_data) {
id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data));
@ -1344,7 +1373,9 @@ static void perform_stream_op_locked(void* stream_op,
grpc_chttp2_transport* t = s->t.get();
s->traced = op->is_traced;
if (!grpc_core::IsCallTracerInTransportEnabled()) {
s->call_tracer = CallTracerIfSampled(s);
}
s->tcp_tracer = TcpTracerIfSampled(s);
if (GRPC_TRACE_FLAG_ENABLED(http)) {
LOG(INFO) << "perform_stream_op_locked[s=" << s << "; op=" << op
@ -1375,6 +1406,7 @@ static void perform_stream_op_locked(void* stream_op,
}
if (op->send_initial_metadata) {
if (!grpc_core::IsCallTracerInTransportEnabled()) {
if (s->call_tracer != nullptr) {
s->call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
@ -1382,6 +1414,17 @@ static void perform_stream_op_locked(void* stream_op,
.Add(s->t->flow_control.stats())
.Add(s->flow_control.stats()));
}
} else if (grpc_core::IsTraceRecordCallopsEnabled()) {
auto* call_tracer =
s->arena->GetContext<grpc_core::CallTracerInterface>();
if (call_tracer != nullptr && call_tracer->IsSampled()) {
call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
gpr_now(GPR_CLOCK_REALTIME))
.Add(s->t->flow_control.stats())
.Add(s->flow_control.stats()));
}
}
if (t->is_client && t->channelz_socket != nullptr) {
t->channelz_socket->RecordStreamStartedFromLocal();
}
@ -1464,9 +1507,8 @@ static void perform_stream_op_locked(void* stream_op,
frame_hdr[3] = static_cast<uint8_t>(len >> 8);
frame_hdr[4] = static_cast<uint8_t>(len);
s->stats.outgoing.framing_bytes += GRPC_HEADER_SIZE_IN_BYTES;
s->stats.outgoing.data_bytes +=
op_payload->send_message.send_message->Length();
s->call_tracer_wrapper.RecordOutgoingBytes(
{GRPC_HEADER_SIZE_IN_BYTES, len, 0});
s->next_message_end_offset =
s->flow_controlled_bytes_written +
static_cast<int64_t>(s->flow_controlled_buffer.length) +

@ -55,7 +55,7 @@ absl::Status grpc_chttp2_data_parser_begin_frame(uint8_t flags,
void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
uint32_t write_bytes, int is_eof,
grpc_transport_one_way_stats* stats,
grpc_core::CallTracerInterface* call_tracer,
grpc_slice_buffer* outbuf) {
grpc_slice hdr;
uint8_t* p;
@ -77,7 +77,7 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
stats->framing_bytes += header_size;
call_tracer->RecordOutgoingBytes({header_size, 0, 0});
}
grpc_core::Poll<grpc_error_handle> grpc_deframe_unprocessed_incoming_frames(
@ -126,8 +126,7 @@ grpc_core::Poll<grpc_error_handle> grpc_deframe_unprocessed_incoming_frames(
if (min_progress_size != nullptr) *min_progress_size = 0;
if (stream_out != nullptr) {
s->stats.incoming.framing_bytes += 5;
s->stats.incoming.data_bytes += length;
s->call_tracer_wrapper.RecordIncomingBytes({5, length, 0});
grpc_slice_buffer_move_first_into_buffer(slices, 5, header);
grpc_slice_buffer_move_first(slices, length, stream_out->c_slice_buffer());
}

@ -33,6 +33,7 @@
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/telemetry/call_tracer.h"
// start processing a new data frame
absl::Status grpc_chttp2_data_parser_begin_frame(uint8_t flags,
@ -49,7 +50,7 @@ grpc_error_handle grpc_chttp2_data_parser_parse(void* parser,
void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
uint32_t write_bytes, int is_eof,
grpc_transport_one_way_stats* stats,
grpc_core::CallTracerInterface* call_tracer,
grpc_slice_buffer* outbuf);
grpc_core::Poll<grpc_error_handle> grpc_deframe_unprocessed_incoming_frames(

@ -39,11 +39,13 @@
#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/metadata_batch.h"
grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code,
grpc_transport_one_way_stats* stats) {
grpc_slice grpc_chttp2_rst_stream_create(
uint32_t id, uint32_t code, grpc_core::CallTracerInterface* call_tracer) {
static const size_t frame_size = 13;
grpc_slice slice = GRPC_SLICE_MALLOC(frame_size);
if (stats != nullptr) stats->framing_bytes += frame_size;
if (call_tracer != nullptr) {
call_tracer->RecordOutgoingBytes({frame_size, 0, 0});
}
uint8_t* p = GRPC_SLICE_START_PTR(slice);
// Frame size.
@ -70,10 +72,10 @@ grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code,
void grpc_chttp2_add_rst_stream_to_next_write(
grpc_chttp2_transport* t, uint32_t id, uint32_t code,
grpc_transport_one_way_stats* stats) {
grpc_core::CallTracerInterface* call_tracer) {
t->num_pending_induced_frames++;
grpc_slice_buffer_add(&t->qbuf,
grpc_chttp2_rst_stream_create(id, code, stats));
grpc_chttp2_rst_stream_create(id, code, call_tracer));
}
grpc_error_handle grpc_chttp2_rst_stream_parser_begin_frame(
@ -102,7 +104,8 @@ grpc_error_handle grpc_chttp2_rst_stream_parser_parse(void* parser,
cur++;
p->byte++;
}
s->stats.incoming.framing_bytes += static_cast<uint64_t>(end - cur);
uint64_t framing_bytes = static_cast<uint64_t>(end - cur);
s->call_tracer_wrapper.RecordIncomingBytes({framing_bytes, 0, 0});
if (p->byte == 4) {
CHECK(is_last);

@ -27,20 +27,22 @@
#include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/telemetry/call_tracer.h"
struct grpc_chttp2_rst_stream_parser {
uint8_t byte;
uint8_t reason_bytes[4];
};
grpc_slice grpc_chttp2_rst_stream_create(uint32_t stream_id, uint32_t code,
grpc_transport_one_way_stats* stats);
grpc_slice grpc_chttp2_rst_stream_create(
uint32_t stream_id, uint32_t code,
grpc_core::CallTracerInterface* call_tracer);
// Adds RST_STREAM frame to t->qbuf (buffer for the next write). Should be
// called when we want to add RST_STREAM and we are not in
// write_action_begin_locked.
void grpc_chttp2_add_rst_stream_to_next_write(
grpc_chttp2_transport* t, uint32_t id, uint32_t code,
grpc_transport_one_way_stats* stats);
grpc_core::CallTracerInterface* call_tracer);
grpc_error_handle grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser* parser, uint32_t length, uint8_t flags);

@ -32,10 +32,13 @@
#include "src/core/ext/transport/chttp2/transport/internal.h"
grpc_slice grpc_chttp2_window_update_create(
uint32_t id, uint32_t window_delta, grpc_transport_one_way_stats* stats) {
uint32_t id, uint32_t window_delta,
grpc_core::CallTracerInterface* call_tracer) {
static const size_t frame_size = 13;
grpc_slice slice = GRPC_SLICE_MALLOC(frame_size);
stats->framing_bytes += frame_size;
if (call_tracer != nullptr) {
call_tracer->RecordOutgoingBytes({frame_size, 0, 0});
}
uint8_t* p = GRPC_SLICE_START_PTR(slice);
CHECK(window_delta);
@ -84,7 +87,8 @@ grpc_error_handle grpc_chttp2_window_update_parser_parse(
}
if (s != nullptr) {
s->stats.incoming.framing_bytes += static_cast<uint32_t>(end - cur);
uint64_t framing_bytes = static_cast<uint32_t>(end - cur);
s->call_tracer_wrapper.RecordIncomingBytes({framing_bytes, 0, 0});
}
if (p->byte == 4) {

@ -27,6 +27,7 @@
#include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/telemetry/call_tracer.h"
struct grpc_chttp2_window_update_parser {
uint8_t byte;
@ -34,7 +35,8 @@ struct grpc_chttp2_window_update_parser {
uint32_t amount;
};
grpc_slice grpc_chttp2_window_update_create(
uint32_t id, uint32_t window_delta, grpc_transport_one_way_stats* stats);
uint32_t id, uint32_t window_delta,
grpc_core::CallTracerInterface* call_tracer);
grpc_error_handle grpc_chttp2_window_update_parser_begin_frame(
grpc_chttp2_window_update_parser* parser, uint32_t length, uint8_t flags);

@ -87,7 +87,7 @@ void HPackCompressor::Frame(const EncodeHeaderOptions& options,
if (options.is_end_of_stream) {
flags |= GRPC_CHTTP2_DATA_FLAG_END_STREAM;
}
options.stats->header_bytes += raw.Length();
options.call_tracer->RecordOutgoingBytes({0, 0, raw.Length()});
while (frame_type == GRPC_CHTTP2_FRAME_HEADER || raw.Length() > 0) {
// per the HTTP/2 spec:
// A HEADERS frame without the END_HEADERS flag set MUST be followed by
@ -101,7 +101,7 @@ void HPackCompressor::Frame(const EncodeHeaderOptions& options,
}
FillHeader(grpc_slice_buffer_tiny_add(output, kHeadersFrameHeaderSize),
frame_type, options.stream_id, len, flags);
options.stats->framing_bytes += kHeadersFrameHeaderSize;
options.call_tracer->RecordOutgoingBytes({kHeadersFrameHeaderSize, 0, 0});
grpc_slice_buffer_move_first(raw.c_slice_buffer(), len, output);
frame_type = GRPC_CHTTP2_FRAME_CONTINUATION;

@ -42,6 +42,7 @@
#include "src/core/lib/transport/metadata_compression_traits.h"
#include "src/core/lib/transport/timeout_encoding.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/telemetry/call_tracer.h"
namespace grpc_core {
@ -353,7 +354,7 @@ class HPackCompressor {
bool is_end_of_stream;
bool use_true_binary_metadata;
size_t max_frame_size;
grpc_transport_one_way_stats* stats;
CallTracerInterface* call_tracer;
};
template <typename HeaderSet>

@ -554,6 +554,51 @@ typedef enum {
GRPC_METADATA_PUBLISHED_AT_CLOSE
} grpc_published_metadata_method;
namespace grpc_core {
// A CallTracer wrapper that updates both the legacy and new APIs for
// transport byte sizes.
// TODO(ctiller): This can go away as part of removing the
// grpc_transport_stream_stats struct.
class Chttp2CallTracerWrapper final : public CallTracerInterface {
public:
explicit Chttp2CallTracerWrapper(grpc_chttp2_stream* stream)
: stream_(stream) {}
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override;
// Everything else is a no-op.
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
void RecordSendCompressedMessage(
const SliceBuffer& /*send_compressed_message*/) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
void RecordReceivedDecompressedMessage(
const SliceBuffer& /*recv_decompressed_message*/) override {}
void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr;
}
void RecordAnnotation(absl::string_view /*annotation*/) override {}
void RecordAnnotation(const Annotation& /*annotation*/) override {}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
private:
grpc_chttp2_stream* stream_;
};
} // namespace grpc_core
struct grpc_chttp2_stream {
grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount,
const void* server_data, grpc_core::Arena* arena);
@ -653,7 +698,11 @@ struct grpc_chttp2_stream {
/// Number of times written
int64_t write_counter = 0;
grpc_core::Chttp2CallTracerWrapper call_tracer_wrapper;
/// Only set when enabled.
// TODO(roth): Remove this when the call_tracer_in_transport
// experiment finishes rolling out.
grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr;
/// Only set when enabled.

@ -555,7 +555,7 @@ static grpc_error_handle init_data_frame_parser(grpc_chttp2_transport* t) {
return init_non_header_skip_frame_parser(t);
}
s->received_bytes += t->incoming_frame_size;
s->stats.incoming.framing_bytes += 9;
s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
if (s->read_closed) {
return init_non_header_skip_frame_parser(t);
}
@ -574,7 +574,7 @@ error_handler:
absl_status_to_grpc_error(status));
grpc_chttp2_add_rst_stream_to_next_write(t, t->incoming_stream_id,
GRPC_HTTP2_PROTOCOL_ERROR,
&s->stats.outgoing);
&s->call_tracer_wrapper);
return init_non_header_skip_frame_parser(t);
} else {
return absl_status_to_grpc_error(status);
@ -725,7 +725,7 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
t->incoming_stream = s;
}
DCHECK_NE(s, nullptr);
s->stats.incoming.framing_bytes += 9;
s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
if (GPR_UNLIKELY(s->read_closed)) {
GRPC_CHTTP2_IF_TRACING(ERROR)
<< "skipping already closed grpc_chttp2_stream header";
@ -796,7 +796,7 @@ static grpc_error_handle init_window_update_frame_parser(
}
return init_non_header_skip_frame_parser(t);
}
s->stats.incoming.framing_bytes += 9;
s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
}
t->parser = grpc_chttp2_transport::Parser{
"window_update", grpc_chttp2_window_update_parser_parse,
@ -822,7 +822,7 @@ static grpc_error_handle init_rst_stream_parser(grpc_chttp2_transport* t) {
if (!t->incoming_stream) {
return init_non_header_skip_frame_parser(t);
}
s->stats.incoming.framing_bytes += 9;
s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
t->parser = grpc_chttp2_transport::Parser{
"rst_stream", grpc_chttp2_rst_stream_parser_parse, &t->simple.rst_stream};
if (!t->is_client && grpc_core::IsRstpitEnabled()) {
@ -918,7 +918,7 @@ static void force_client_rst_stream(void* sp, grpc_error_handle /*error*/) {
grpc_chttp2_transport* t = s->t.get();
if (!s->write_closed) {
grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing);
&s->call_tracer_wrapper);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
grpc_chttp2_mark_stream_closed(t, s, true, true, absl::OkStatus());
}
@ -933,9 +933,12 @@ grpc_error_handle grpc_chttp2_header_parser_parse(void* hpack_parser,
auto* parser = static_cast<grpc_core::HPackParser*>(hpack_parser);
grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr;
if (s != nullptr) {
s->stats.incoming.header_bytes += GRPC_SLICE_LENGTH(slice);
s->call_tracer_wrapper.RecordIncomingBytes(
{0, 0, GRPC_SLICE_LENGTH(slice)});
call_tracer =
s->arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
grpc_core::IsCallTracerInTransportEnabled()
? s->arena->GetContext<grpc_core::CallTracerInterface>()
: s->arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
}
grpc_error_handle error = parser->Parse(
slice, is_last != 0, absl::BitGenRef(t->bitgen), call_tracer);

@ -296,10 +296,9 @@ class WriteContext {
uint32_t transport_announce = t_->flow_control.MaybeSendUpdate(
t_->outbuf.c_slice_buffer()->count > 0);
if (transport_announce) {
grpc_transport_one_way_stats throwaway_stats;
grpc_slice_buffer_add(t_->outbuf.c_slice_buffer(),
grpc_chttp2_window_update_create(
0, transport_announce, &throwaway_stats));
grpc_slice_buffer_add(
t_->outbuf.c_slice_buffer(),
grpc_chttp2_window_update_create(0, transport_announce, nullptr));
grpc_chttp2_reset_ping_clock(t_);
}
}
@ -411,7 +410,7 @@ class DataSendContext {
s_->send_trailing_metadata != nullptr &&
s_->send_trailing_metadata->empty();
grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
is_last_frame_, &s_->stats.outgoing,
is_last_frame_, &s_->call_tracer_wrapper,
t_->outbuf.c_slice_buffer());
sfc_upd_.SentData(send_bytes);
s_->sending_bytes += send_bytes;
@ -470,8 +469,7 @@ class StreamWriteContext {
t_->settings.peer()
.allow_true_binary_metadata(), // use_true_binary_metadata
t_->settings.peer().max_frame_size(), // max_frame_size
&s_->stats.outgoing // stats
},
&s_->call_tracer_wrapper},
*s_->send_initial_metadata, t_->outbuf.c_slice_buffer());
grpc_chttp2_reset_ping_clock(t_);
write_context_->IncInitialMetadataWrites();
@ -483,6 +481,7 @@ class StreamWriteContext {
grpc_chttp2_complete_closure_step(t_, &s_->send_initial_metadata_finished,
absl::OkStatus(),
"send_initial_metadata_finished");
if (!grpc_core::IsCallTracerInTransportEnabled()) {
if (s_->call_tracer) {
grpc_core::HttpAnnotation::WriteStats write_stats;
write_stats.target_write_size = write_context_->target_write_size();
@ -494,6 +493,21 @@ class StreamWriteContext {
.Add(s_->flow_control.stats())
.Add(write_stats));
}
} else if (grpc_core::IsTraceRecordCallopsEnabled()) {
auto* call_tracer =
s_->arena->GetContext<grpc_core::CallTracerInterface>();
if (call_tracer != nullptr && call_tracer->IsSampled()) {
grpc_core::HttpAnnotation::WriteStats write_stats;
write_stats.target_write_size = write_context_->target_write_size();
call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(
grpc_core::HttpAnnotation::Type::kHeadWritten,
gpr_now(GPR_CLOCK_REALTIME))
.Add(s_->t->flow_control.stats())
.Add(s_->flow_control.stats())
.Add(write_stats));
}
}
}
void FlushWindowUpdates() {
@ -503,9 +517,10 @@ class StreamWriteContext {
const uint32_t stream_announce = s_->flow_control.MaybeSendUpdate();
if (stream_announce == 0) return;
grpc_slice_buffer_add(t_->outbuf.c_slice_buffer(),
grpc_chttp2_window_update_create(
s_->id, stream_announce, &s_->stats.outgoing));
grpc_slice_buffer_add(
t_->outbuf.c_slice_buffer(),
grpc_chttp2_window_update_create(s_->id, stream_announce,
&s_->call_tracer_wrapper));
grpc_chttp2_reset_ping_clock(t_);
write_context_->IncWindowUpdateWrites();
}
@ -558,12 +573,13 @@ class StreamWriteContext {
GRPC_CHTTP2_IF_TRACING(INFO) << "sending trailing_metadata";
if (s_->send_trailing_metadata->empty()) {
grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
&s_->stats.outgoing, t_->outbuf.c_slice_buffer());
&s_->call_tracer_wrapper,
t_->outbuf.c_slice_buffer());
} else {
t_->hpack_compressor.EncodeHeaders(
grpc_core::HPackCompressor::EncodeHeaderOptions{
s_->id, true, t_->settings.peer().allow_true_binary_metadata(),
t_->settings.peer().max_frame_size(), &s_->stats.outgoing},
t_->settings.peer().max_frame_size(), &s_->call_tracer_wrapper},
*s_->send_trailing_metadata, t_->outbuf.c_slice_buffer());
}
write_context_->IncTrailingMetadataWrites();
@ -627,10 +643,11 @@ class StreamWriteContext {
grpc_slice_buffer_add(
t_->outbuf.c_slice_buffer(),
grpc_chttp2_rst_stream_create(s_->id, GRPC_HTTP2_NO_ERROR,
&s_->stats.outgoing));
&s_->call_tracer_wrapper));
}
grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
absl::OkStatus());
if (!grpc_core::IsCallTracerInTransportEnabled()) {
if (s_->call_tracer) {
s_->call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd,
@ -638,6 +655,17 @@ class StreamWriteContext {
.Add(s_->t->flow_control.stats())
.Add(s_->flow_control.stats()));
}
} else if (grpc_core::IsTraceRecordCallopsEnabled()) {
auto* call_tracer =
s_->arena->GetContext<grpc_core::CallTracerInterface>();
if (call_tracer != nullptr && call_tracer->IsSampled()) {
call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd,
gpr_now(GPR_CLOCK_REALTIME))
.Add(s_->t->flow_control.stats())
.Add(s_->flow_control.stats()));
}
}
}
WriteContext* const write_context_;

@ -114,7 +114,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_call_status_override_on_cancellation, nullptr, 0,
true, true},
{"call_tracer_in_transport", description_call_tracer_in_transport,
additional_constraints_call_tracer_in_transport, nullptr, 0, false, true},
additional_constraints_call_tracer_in_transport, nullptr, 0, true, true},
{"canary_client_privacy", description_canary_client_privacy,
additional_constraints_canary_client_privacy, nullptr, 0, false, false},
{"client_privacy", description_client_privacy,
@ -264,7 +264,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_call_status_override_on_cancellation, nullptr, 0,
true, true},
{"call_tracer_in_transport", description_call_tracer_in_transport,
additional_constraints_call_tracer_in_transport, nullptr, 0, false, true},
additional_constraints_call_tracer_in_transport, nullptr, 0, true, true},
{"canary_client_privacy", description_canary_client_privacy,
additional_constraints_canary_client_privacy, nullptr, 0, false, false},
{"client_privacy", description_client_privacy,
@ -414,7 +414,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_call_status_override_on_cancellation, nullptr, 0,
true, true},
{"call_tracer_in_transport", description_call_tracer_in_transport,
additional_constraints_call_tracer_in_transport, nullptr, 0, false, true},
additional_constraints_call_tracer_in_transport, nullptr, 0, true, true},
{"canary_client_privacy", description_canary_client_privacy,
additional_constraints_canary_client_privacy, nullptr, 0, false, false},
{"client_privacy", description_client_privacy,

@ -59,7 +59,8 @@ namespace grpc_core {
#if defined(GRPC_CFSTREAM)
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION
inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; }
inline bool IsCallTracerInTransportEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_TRACER_IN_TRANSPORT
inline bool IsCallTracerInTransportEnabled() { return true; }
inline bool IsCanaryClientPrivacyEnabled() { return false; }
inline bool IsClientPrivacyEnabled() { return false; }
inline bool IsEventEngineClientEnabled() { return false; }
@ -89,7 +90,8 @@ inline bool IsWorkSerializerDispatchEnabled() { return false; }
#elif defined(GPR_WINDOWS)
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION
inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; }
inline bool IsCallTracerInTransportEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_TRACER_IN_TRANSPORT
inline bool IsCallTracerInTransportEnabled() { return true; }
inline bool IsCanaryClientPrivacyEnabled() { return false; }
inline bool IsClientPrivacyEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_EVENT_ENGINE_CLIENT
@ -122,7 +124,8 @@ inline bool IsWorkSerializerDispatchEnabled() { return false; }
#else
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION
inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; }
inline bool IsCallTracerInTransportEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_TRACER_IN_TRANSPORT
inline bool IsCallTracerInTransportEnabled() { return true; }
inline bool IsCanaryClientPrivacyEnabled() { return false; }
inline bool IsClientPrivacyEnabled() { return false; }
inline bool IsEventEngineClientEnabled() { return false; }

@ -42,6 +42,8 @@
- name: call_status_override_on_cancellation
default: true
- name: call_tracer_in_transport
default: true
- name: call_v3
default: false
- name: canary_client_privacy

@ -32,6 +32,15 @@
namespace grpc_core {
CallTracerInterface::TransportByteSize&
CallTracerInterface::TransportByteSize::operator+=(
const CallTracerInterface::TransportByteSize& other) {
framing_bytes += other.framing_bytes;
data_bytes += other.data_bytes;
header_bytes += other.header_bytes;
return *this;
}
//
// ServerCallTracerFactory
//
@ -42,6 +51,7 @@ ServerCallTracerFactory* g_server_call_tracer_factory_ = nullptr;
const char* kServerCallTracerFactoryChannelArgName =
"grpc.experimental.server_call_tracer_factory";
} // namespace
ServerCallTracerFactory* ServerCallTracerFactory::Get(
@ -139,6 +149,18 @@ class DelegatingClientCallTracer : public ClientCallTracer {
tracer->RecordEnd(latency);
}
}
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override {
for (auto* tracer : tracers_) {
tracer->RecordIncomingBytes(transport_byte_size);
}
}
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override {
for (auto* tracer : tracers_) {
tracer->RecordOutgoingBytes(transport_byte_size);
}
}
void RecordAnnotation(absl::string_view annotation) override {
for (auto* tracer : tracers_) {
tracer->RecordAnnotation(annotation);
@ -271,6 +293,18 @@ class DelegatingServerCallTracer : public ServerCallTracer {
tracer->RecordEnd(final_info);
}
}
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override {
for (auto* tracer : tracers_) {
tracer->RecordIncomingBytes(transport_byte_size);
}
}
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override {
for (auto* tracer : tracers_) {
tracer->RecordOutgoingBytes(transport_byte_size);
}
}
void RecordAnnotation(absl::string_view annotation) override {
for (auto* tracer : tracers_) {
tracer->RecordAnnotation(annotation);

@ -110,6 +110,19 @@ class CallTracerInterface : public CallTracerAnnotationInterface {
virtual void RecordReceivedDecompressedMessage(
const SliceBuffer& recv_decompressed_message) = 0;
virtual void RecordCancel(grpc_error_handle cancel_error) = 0;
struct TransportByteSize {
uint64_t framing_bytes = 0;
uint64_t data_bytes = 0;
uint64_t header_bytes = 0;
TransportByteSize& operator+=(const TransportByteSize& other);
};
virtual void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) = 0;
virtual void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) = 0;
// Traces a new TCP transport attempt for this call attempt. Note the TCP
// transport may finish tracing and unref the TCP tracer before or after the
// call completion in gRPC core. No TCP tracing when null is returned.
@ -144,6 +157,8 @@ class ClientCallTracer : public CallTracerAnnotationInterface {
// will be null.
virtual void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
// TODO(roth): Remove this argument when the
// call_tracer_in_transport experiment finishes rolling out.
const grpc_transport_stream_stats* transport_stream_stats) = 0;
// Should be the last API call to the object. Once invoked, the tracer
// library is free to destroy the object.

@ -53,6 +53,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h"
@ -214,17 +215,18 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
tags.emplace_back(ClientStatusTagKey(),
absl::StatusCodeToString(status_code_));
uint64_t outgoing_bytes = 0;
uint64_t incoming_bytes = 0;
if (grpc_core::IsCallTracerInTransportEnabled()) {
outgoing_bytes = outgoing_bytes_.load();
incoming_bytes = incoming_bytes_.load();
} else if (transport_stream_stats != nullptr) {
outgoing_bytes = transport_stream_stats->outgoing.data_bytes;
incoming_bytes = transport_stream_stats->incoming.data_bytes;
}
::opencensus::stats::Record(
// TODO(yashykt): Recording zeros here when transport_stream_stats is
// nullptr is unfortunate and should be fixed.
{{RpcClientSentBytesPerRpc(),
static_cast<double>(transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0)},
{RpcClientReceivedBytesPerRpc(),
static_cast<double>(transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0)},
{{RpcClientSentBytesPerRpc(), static_cast<double>(outgoing_bytes)},
{RpcClientReceivedBytesPerRpc(), static_cast<double>(incoming_bytes)},
{RpcClientServerLatency(),
ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time))},
{RpcClientRoundtripLatency(),
@ -233,6 +235,16 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
}
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordIncomingBytes(
const TransportByteSize& transport_byte_size) {
incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) {
outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel(
absl::Status /*cancel_error*/) {}

@ -21,6 +21,7 @@
#include <stdint.h>
#include <atomic>
#include <memory>
#include <string>
@ -96,6 +97,10 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer {
void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) override;
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordCancel(grpc_error_handle cancel_error) override;
void RecordEnd(const gpr_timespec& /*latency*/) override;
void RecordAnnotation(absl::string_view annotation) override;
@ -121,6 +126,12 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer {
uint64_t sent_message_count_ = 0;
// End status code
absl::StatusCode status_code_;
// TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated
// to promises, after which we can ensure that the transport invokes
// the RecordIncomingBytes() and RecordOutgoingBytes() methods inside
// the call's party.
std::atomic<uint64_t> incoming_bytes_{0};
std::atomic<uint64_t> outgoing_bytes_{0};
};
explicit OpenCensusCallTracer(grpc_core::Slice path, grpc_core::Arena* arena,

@ -21,6 +21,7 @@
#include <stdint.h>
#include <string.h>
#include <atomic>
#include <memory>
#include <string>
#include <utility>
@ -154,6 +155,11 @@ class OpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
void RecordEnd(const grpc_call_final_info* final_info) override;
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordAnnotation(absl::string_view annotation) override {
if (!context_.Span().IsRecording()) {
return;
@ -193,6 +199,12 @@ class OpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
// Buffer needed for grpc_slice to reference it when adding metatdata to
// response.
char stats_buf_[kMaxServerStatsLen];
// TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated
// to promises, after which we can ensure that the transport invokes
// the RecordIncomingBytes() and RecordOutgoingBytes() methods inside
// the call's party.
std::atomic<uint64_t> incoming_bytes_{0};
std::atomic<uint64_t> outgoing_bytes_{0};
};
void OpenCensusServerCallTracer::RecordReceivedInitialMetadata(
@ -236,8 +248,18 @@ void OpenCensusServerCallTracer::RecordSendTrailingMetadata(
void OpenCensusServerCallTracer::RecordEnd(
const grpc_call_final_info* final_info) {
if (OpenCensusStatsEnabled()) {
const uint64_t request_size = GetOutgoingDataSize(final_info);
const uint64_t response_size = GetIncomingDataSize(final_info);
uint64_t outgoing_bytes;
uint64_t incoming_bytes;
if (grpc_core::IsCallTracerInTransportEnabled()) {
outgoing_bytes = outgoing_bytes_.load();
incoming_bytes = incoming_bytes_.load();
} else {
// Note: We are incorrectly swapping the two values here, which is
// a pre-existing bug. This code will go away as part of the
// experiment rollout.
outgoing_bytes = GetIncomingDataSize(final_info);
incoming_bytes = GetOutgoingDataSize(final_info);
}
double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
@ -246,8 +268,8 @@ void OpenCensusServerCallTracer::RecordEnd(
ServerStatusTagKey(),
std::string(StatusCodeToString(final_info->final_status)));
::opencensus::stats::Record(
{{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)},
{RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)},
{{RpcServerSentBytesPerRpc(), static_cast<double>(outgoing_bytes)},
{RpcServerReceivedBytesPerRpc(), static_cast<double>(incoming_bytes)},
{RpcServerServerLatency(), elapsed_time_ms},
{RpcServerSentMessagesPerRpc(), sent_message_count_},
{RpcServerReceivedMessagesPerRpc(), recv_message_count_}},
@ -258,6 +280,16 @@ void OpenCensusServerCallTracer::RecordEnd(
}
}
void OpenCensusServerCallTracer::RecordIncomingBytes(
const TransportByteSize& transport_byte_size) {
incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
}
void OpenCensusServerCallTracer::RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) {
outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
}
//
// OpenCensusServerCallTracerFactory
//

@ -76,6 +76,7 @@ grpc_cc_library(
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:error",
"//src/core:experiments",
"//src/core:match",
"//src/core:metadata_batch",
"//src/core:metrics",

@ -47,6 +47,7 @@
#include "src/core/client_channel/client_channel_filter.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h"
@ -162,22 +163,35 @@ void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
opentelemetry::context::Context{});
}
uint64_t outgoing_bytes = 0;
uint64_t incoming_bytes = 0;
if (grpc_core::IsCallTracerInTransportEnabled()) {
outgoing_bytes = outgoing_bytes_.load();
incoming_bytes = incoming_bytes_.load();
} else if (transport_stream_stats != nullptr) {
outgoing_bytes = transport_stream_stats->outgoing.data_bytes;
incoming_bytes = transport_stream_stats->incoming.data_bytes;
}
if (parent_->otel_plugin_->client_.attempt
.sent_total_compressed_message_size != nullptr) {
parent_->otel_plugin_->client_.attempt.sent_total_compressed_message_size
->Record(transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0,
labels, opentelemetry::context::Context{});
->Record(outgoing_bytes, labels, opentelemetry::context::Context{});
}
if (parent_->otel_plugin_->client_.attempt
.rcvd_total_compressed_message_size != nullptr) {
parent_->otel_plugin_->client_.attempt.rcvd_total_compressed_message_size
->Record(transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0,
labels, opentelemetry::context::Context{});
->Record(incoming_bytes, labels, opentelemetry::context::Context{});
}
}
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordIncomingBytes(const TransportByteSize& transport_byte_size) {
incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
}
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordOutgoingBytes(const TransportByteSize& transport_byte_size) {
outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
}
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::RecordCancel(

@ -84,8 +84,12 @@ class OpenTelemetryPluginImpl::ClientCallTracer
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) override;
void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/,
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) override;
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordCancel(grpc_error_handle cancel_error) override;
void RecordEnd(const gpr_timespec& /*latency*/) override;
void RecordAnnotation(absl::string_view /*annotation*/) override;
@ -109,6 +113,12 @@ class OpenTelemetryPluginImpl::ClientCallTracer
std::vector<std::unique_ptr<LabelsIterable>>
injected_labels_from_plugin_options_;
bool is_trailers_only_ = false;
// TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated
// to promises, after which we can ensure that the transport invokes
// the RecordIncomingBytes() and RecordOutgoingBytes() methods inside
// the call's party.
std::atomic<uint64_t> incoming_bytes_{0};
std::atomic<uint64_t> outgoing_bytes_{0};
};
ClientCallTracer(

@ -37,6 +37,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
@ -123,15 +124,29 @@ void OpenTelemetryPluginImpl::ServerCallTracer::RecordEnd(
if (otel_plugin_->server_.call.sent_total_compressed_message_size !=
nullptr) {
otel_plugin_->server_.call.sent_total_compressed_message_size->Record(
final_info->stats.transport_stream_stats.outgoing.data_bytes, labels,
opentelemetry::context::Context{});
grpc_core::IsCallTracerInTransportEnabled()
? outgoing_bytes_.load()
: final_info->stats.transport_stream_stats.outgoing.data_bytes,
labels, opentelemetry::context::Context{});
}
if (otel_plugin_->server_.call.rcvd_total_compressed_message_size !=
nullptr) {
otel_plugin_->server_.call.rcvd_total_compressed_message_size->Record(
final_info->stats.transport_stream_stats.incoming.data_bytes, labels,
opentelemetry::context::Context{});
grpc_core::IsCallTracerInTransportEnabled()
? incoming_bytes_.load()
: final_info->stats.transport_stream_stats.incoming.data_bytes,
labels, opentelemetry::context::Context{});
}
}
void OpenTelemetryPluginImpl::ServerCallTracer::RecordIncomingBytes(
const TransportByteSize& transport_byte_size) {
incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
}
void OpenTelemetryPluginImpl::ServerCallTracer::RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) {
outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
}
} // namespace internal

@ -100,6 +100,11 @@ class OpenTelemetryPluginImpl::ServerCallTracer
void RecordEnd(const grpc_call_final_info* final_info) override;
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordAnnotation(absl::string_view /*annotation*/) override {
// Not implemented
}
@ -131,6 +136,12 @@ class OpenTelemetryPluginImpl::ServerCallTracer
injected_labels_from_plugin_options_;
OpenTelemetryPluginImpl* otel_plugin_;
std::shared_ptr<OpenTelemetryPluginImpl::ServerScopeConfig> scope_config_;
// TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated
// to promises, after which we can ensure that the transport invokes
// the RecordIncomingBytes() and RecordOutgoingBytes() methods inside
// the call's party.
std::atomic<uint64_t> incoming_bytes_{0};
std::atomic<uint64_t> outgoing_bytes_{0};
};
} // namespace internal

@ -28,6 +28,7 @@
#include <grpc/slice.h>
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/slice/slice.h"
namespace grpc_observability {
@ -273,19 +274,22 @@ void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
for (const auto& label : labels_from_peer_) {
context_.Labels().emplace_back(label);
}
RecordDoubleMetric(
kRpcClientSentBytesPerRpcMeasureName,
static_cast<double>(transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0),
context_.Labels(), parent_->identifier_, parent_->registered_method_,
uint64_t incoming_bytes = 0;
uint64_t outgoing_bytes = 0;
if (grpc_core::IsCallTracerInTransportEnabled()) {
incoming_bytes = incoming_bytes_.load();
outgoing_bytes = outgoing_bytes_.load();
} else if (transport_stream_stats != nullptr) {
incoming_bytes = transport_stream_stats->incoming.data_bytes;
outgoing_bytes = transport_stream_stats->outgoing.data_bytes;
}
RecordDoubleMetric(kRpcClientSentBytesPerRpcMeasureName,
static_cast<double>(outgoing_bytes), context_.Labels(),
parent_->identifier_, parent_->registered_method_,
/*include_exchange_labels=*/true);
RecordDoubleMetric(
kRpcClientReceivedBytesPerRpcMeasureName,
static_cast<double>(transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0),
context_.Labels(), parent_->identifier_, parent_->registered_method_,
RecordDoubleMetric(kRpcClientReceivedBytesPerRpcMeasureName,
static_cast<double>(incoming_bytes), context_.Labels(),
parent_->identifier_, parent_->registered_method_,
/*include_exchange_labels=*/true);
RecordDoubleMetric(kRpcClientServerLatencyMeasureName,
absl::ToDoubleSeconds(absl::Nanoseconds(elapsed_time)),
@ -302,6 +306,16 @@ void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
/*include_exchange_labels=*/true);
}
void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordIncomingBytes(const TransportByteSize& transport_byte_size) {
incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
}
void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordOutgoingBytes(const TransportByteSize& transport_byte_size) {
outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
}
void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordCancel(absl::Status /*cancel_error*/) {}

@ -17,6 +17,7 @@
#include <stdint.h>
#include <atomic>
#include <string>
#include "absl/base/thread_annotations.h"
@ -69,6 +70,10 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer {
void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) override;
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordCancel(grpc_error_handle cancel_error) override;
void RecordEnd(const gpr_timespec& /*latency*/) override;
void RecordAnnotation(absl::string_view annotation) override;
@ -97,6 +102,12 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer {
optional_labels_array_;
std::vector<Label> labels_from_peer_;
bool is_trailers_only_ = false;
// TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated
// to promises, after which we can ensure that the transport invokes
// the RecordIncomingBytes() and RecordOutgoingBytes() methods inside
// the call's party.
std::atomic<uint64_t> incoming_bytes_{0};
std::atomic<uint64_t> outgoing_bytes_{0};
};
explicit PythonOpenCensusCallTracer(

@ -36,6 +36,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
@ -175,8 +176,15 @@ void PythonOpenCensusServerCallTracer::RecordCancel(
void PythonOpenCensusServerCallTracer::RecordEnd(
const grpc_call_final_info* final_info) {
if (PythonCensusStatsEnabled()) {
const uint64_t request_size = GetIncomingDataSize(final_info);
const uint64_t response_size = GetOutgoingDataSize(final_info);
uint64_t outgoing_bytes;
uint64_t incoming_bytes;
if (grpc_core::IsCallTracerInTransportEnabled()) {
outgoing_bytes = outgoing_bytes_.load();
incoming_bytes = incoming_bytes_.load();
} else {
outgoing_bytes = GetOutgoingDataSize(final_info);
incoming_bytes = GetIncomingDataSize(final_info);
}
double elapsed_time_s = absl::ToDoubleSeconds(elapsed_time_);
context_.Labels().emplace_back(kServerMethod, std::string(method_));
context_.Labels().emplace_back(
@ -186,11 +194,11 @@ void PythonOpenCensusServerCallTracer::RecordEnd(
context_.Labels().emplace_back(label);
}
RecordDoubleMetric(kRpcServerSentBytesPerRpcMeasureName,
static_cast<double>(response_size), context_.Labels(),
static_cast<double>(outgoing_bytes), context_.Labels(),
identifier_, registered_method_,
/*include_exchange_labels=*/true);
RecordDoubleMetric(kRpcServerReceivedBytesPerRpcMeasureName,
static_cast<double>(request_size), context_.Labels(),
static_cast<double>(incoming_bytes), context_.Labels(),
identifier_, registered_method_,
/*include_exchange_labels=*/true);
RecordDoubleMetric(kRpcServerServerLatencyMeasureName, elapsed_time_s,
@ -218,6 +226,16 @@ void PythonOpenCensusServerCallTracer::RecordEnd(
delete this;
}
void PythonOpenCensusServerCallTracer::RecordIncomingBytes(
const TransportByteSize& transport_byte_size) {
incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
}
void PythonOpenCensusServerCallTracer::RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) {
outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
}
void PythonOpenCensusServerCallTracer::RecordAnnotation(
absl::string_view annotation) {
if (!context_.GetSpanContext().IsSampled()) {

@ -15,6 +15,8 @@
#ifndef GRPC_PYTHON_OPENCENSUS_SERVER_CALL_TRACER_H
#define GRPC_PYTHON_OPENCENSUS_SERVER_CALL_TRACER_H
#include <atomic>
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "constants.h"
@ -102,6 +104,12 @@ class PythonOpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
void RecordEnd(const grpc_call_final_info* final_info) override;
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override;
void RecordAnnotation(absl::string_view annotation) override;
void RecordAnnotation(const Annotation& annotation) override;
@ -124,6 +132,12 @@ class PythonOpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
std::vector<Label> labels_from_peer_;
std::string identifier_;
bool registered_method_ = false;
// TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated
// to promises, after which we can ensure that the transport invokes
// the RecordIncomingBytes() and RecordOutgoingBytes() methods inside
// the call's party.
std::atomic<uint64_t> incoming_bytes_{0};
std::atomic<uint64_t> outgoing_bytes_{0};
};
} // namespace grpc_observability

@ -90,7 +90,11 @@ BAZEL_DEPS = os.path.join(
)
# the bazel target to scrape to get list of sources for the build
BAZEL_DEPS_QUERIES = ["//src/core:slice", "//src/core:ref_counted_string"]
BAZEL_DEPS_QUERIES = [
"//src/core:experiments",
"//src/core:slice",
"//src/core:ref_counted_string",
]
def _bazel_query(query):

@ -21,6 +21,8 @@ CC_FILES=[
'grpc_root/src/core/lib/debug/trace.cc',
'grpc_root/src/core/lib/debug/trace_flags.cc',
'grpc_root/src/core/lib/event_engine/thread_local.cc',
'grpc_root/src/core/lib/experiments/config.cc',
'grpc_root/src/core/lib/experiments/experiments.cc',
'grpc_root/src/core/lib/gprpp/crash.cc',
'grpc_root/src/core/lib/gprpp/examine_stack.cc',
'grpc_root/src/core/lib/gprpp/fork.cc',

@ -63,6 +63,11 @@ class FakeCallTracer : public ClientCallTracer {
public:
class FakeCallAttemptTracer : public CallAttemptTracer {
public:
FakeCallAttemptTracer() {
MutexLock lock(g_mu);
incoming_bytes_ = TransportByteSize();
outgoing_bytes_ = TransportByteSize();
}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
@ -83,8 +88,30 @@ class FakeCallTracer : public ClientCallTracer {
absl::Status /*status*/,
grpc_metadata_batch* /*recv_trailing_metadata*/,
const grpc_transport_stream_stats* transport_stream_stats) override {
if (IsCallTracerInTransportEnabled()) return;
TransportByteSize incoming_bytes = {
transport_stream_stats->incoming.framing_bytes,
transport_stream_stats->incoming.data_bytes,
transport_stream_stats->incoming.header_bytes};
TransportByteSize outgoing_bytes = {
transport_stream_stats->outgoing.framing_bytes,
transport_stream_stats->outgoing.data_bytes,
transport_stream_stats->outgoing.header_bytes};
MutexLock lock(g_mu);
incoming_bytes_ = incoming_bytes;
outgoing_bytes_ = outgoing_bytes;
}
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override {
MutexLock lock(g_mu);
incoming_bytes_ += transport_byte_size;
}
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override {
MutexLock lock(g_mu);
transport_stream_stats_ = *transport_stream_stats;
outgoing_bytes_ += transport_byte_size;
}
void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
@ -101,14 +128,19 @@ class FakeCallTracer : public ClientCallTracer {
void SetOptionalLabel(OptionalLabelKey /*key*/,
RefCountedStringValue /*value*/) override {}
static grpc_transport_stream_stats transport_stream_stats() {
static TransportByteSize incoming_bytes() {
MutexLock lock(g_mu);
return incoming_bytes_;
}
static TransportByteSize outgoing_bytes() {
MutexLock lock(g_mu);
return transport_stream_stats_;
return outgoing_bytes_;
}
private:
static grpc_transport_stream_stats transport_stream_stats_
ABSL_GUARDED_BY(g_mu);
static TransportByteSize incoming_bytes_ ABSL_GUARDED_BY(g_mu);
static TransportByteSize outgoing_bytes_ ABSL_GUARDED_BY(g_mu);
};
explicit FakeCallTracer() {}
@ -126,11 +158,18 @@ class FakeCallTracer : public ClientCallTracer {
void RecordAnnotation(const Annotation& /*annotation*/) override {}
};
grpc_transport_stream_stats
FakeCallTracer::FakeCallAttemptTracer::transport_stream_stats_;
CallTracerInterface::TransportByteSize
FakeCallTracer::FakeCallAttemptTracer::incoming_bytes_;
CallTracerInterface::TransportByteSize
FakeCallTracer::FakeCallAttemptTracer::outgoing_bytes_;
class FakeServerCallTracer : public ServerCallTracer {
public:
FakeServerCallTracer() {
MutexLock lock(g_mu);
incoming_bytes_ = TransportByteSize();
outgoing_bytes_ = TransportByteSize();
}
~FakeServerCallTracer() override {}
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
@ -152,28 +191,57 @@ class FakeServerCallTracer : public ServerCallTracer {
grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
void RecordEnd(const grpc_call_final_info* final_info) override {
if (!IsCallTracerInTransportEnabled()) {
TransportByteSize incoming_bytes = {
final_info->stats.transport_stream_stats.incoming.framing_bytes,
final_info->stats.transport_stream_stats.incoming.data_bytes,
final_info->stats.transport_stream_stats.incoming.header_bytes};
TransportByteSize outgoing_bytes = {
final_info->stats.transport_stream_stats.outgoing.framing_bytes,
final_info->stats.transport_stream_stats.outgoing.data_bytes,
final_info->stats.transport_stream_stats.outgoing.header_bytes};
MutexLock lock(g_mu);
transport_stream_stats_ = final_info->stats.transport_stream_stats;
incoming_bytes_ = incoming_bytes;
outgoing_bytes_ = outgoing_bytes;
}
g_server_call_ended_notify->Notify();
}
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override {
MutexLock lock(g_mu);
incoming_bytes_ += transport_byte_size;
}
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override {
MutexLock lock(g_mu);
outgoing_bytes_ += transport_byte_size;
}
void RecordAnnotation(absl::string_view /*annotation*/) override {}
void RecordAnnotation(const Annotation& /*annotation*/) override {}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
static grpc_transport_stream_stats transport_stream_stats() {
static TransportByteSize incoming_bytes() {
MutexLock lock(g_mu);
return incoming_bytes_;
}
static TransportByteSize outgoing_bytes() {
MutexLock lock(g_mu);
return transport_stream_stats_;
return outgoing_bytes_;
}
private:
static grpc_transport_stream_stats transport_stream_stats_
ABSL_GUARDED_BY(g_mu);
static TransportByteSize incoming_bytes_ ABSL_GUARDED_BY(g_mu);
static TransportByteSize outgoing_bytes_ ABSL_GUARDED_BY(g_mu);
};
grpc_transport_stream_stats FakeServerCallTracer::transport_stream_stats_;
CallTracerInterface::TransportByteSize FakeServerCallTracer::incoming_bytes_;
CallTracerInterface::TransportByteSize FakeServerCallTracer::outgoing_bytes_;
// TODO(yijiem): figure out how to reuse FakeStatsPlugin instead of
// inheriting and overriding it here.
@ -238,29 +306,32 @@ CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) {
g_client_call_ended_notify->WaitForNotificationWithTimeout(absl::Seconds(5));
g_server_call_ended_notify->WaitForNotificationWithTimeout(absl::Seconds(5));
auto client_transport_stats =
FakeCallTracer::FakeCallAttemptTracer::transport_stream_stats();
auto server_transport_stats = FakeServerCallTracer::transport_stream_stats();
EXPECT_EQ(client_transport_stats.outgoing.data_bytes,
auto client_outgoing_transport_stats =
FakeCallTracer::FakeCallAttemptTracer::outgoing_bytes();
auto client_incoming_transport_stats =
FakeCallTracer::FakeCallAttemptTracer::incoming_bytes();
auto server_outgoing_transport_stats = FakeServerCallTracer::outgoing_bytes();
auto server_incoming_transport_stats = FakeServerCallTracer::incoming_bytes();
EXPECT_EQ(client_outgoing_transport_stats.data_bytes,
send_from_client.size());
EXPECT_EQ(client_transport_stats.incoming.data_bytes,
EXPECT_EQ(client_incoming_transport_stats.data_bytes,
send_from_server.size());
EXPECT_EQ(server_transport_stats.outgoing.data_bytes,
EXPECT_EQ(server_outgoing_transport_stats.data_bytes,
send_from_server.size());
EXPECT_EQ(server_transport_stats.incoming.data_bytes,
EXPECT_EQ(server_incoming_transport_stats.data_bytes,
send_from_client.size());
// At the very minimum, we should have 9 bytes from initial header frame, 9
// bytes from data header frame, 5 bytes from the grpc header on data and 9
// bytes from the trailing header frame. The actual number might be more due
// to RST_STREAM (13 bytes) and WINDOW_UPDATE (13 bytes) frames.
EXPECT_GE(client_transport_stats.outgoing.framing_bytes, 32);
EXPECT_LE(client_transport_stats.outgoing.framing_bytes, 58);
EXPECT_GE(client_transport_stats.incoming.framing_bytes, 32);
EXPECT_LE(client_transport_stats.incoming.framing_bytes, 58);
EXPECT_GE(server_transport_stats.outgoing.framing_bytes, 32);
EXPECT_LE(server_transport_stats.outgoing.framing_bytes, 58);
EXPECT_GE(server_transport_stats.incoming.framing_bytes, 32);
EXPECT_LE(server_transport_stats.incoming.framing_bytes, 58);
EXPECT_GE(client_outgoing_transport_stats.framing_bytes, 32);
EXPECT_LE(client_outgoing_transport_stats.framing_bytes, 58);
EXPECT_GE(client_incoming_transport_stats.framing_bytes, 32);
EXPECT_LE(client_incoming_transport_stats.framing_bytes, 58);
EXPECT_GE(server_outgoing_transport_stats.framing_bytes, 32);
EXPECT_LE(server_outgoing_transport_stats.framing_bytes, 58);
EXPECT_GE(server_incoming_transport_stats.framing_bytes, 32);
EXPECT_LE(server_incoming_transport_stats.framing_bytes, 58);
delete g_client_call_ended_notify;
g_client_call_ended_notify = nullptr;

@ -87,6 +87,10 @@ class FakeClientCallTracer : public ClientCallTracer {
const grpc_transport_stream_stats* /*transport_stream_stats*/)
override {}
void RecordEnd(const gpr_timespec& /*latency*/) override { Unref(); }
void RecordIncomingBytes(
const TransportByteSize& /*transport_byte_size*/) override {}
void RecordOutgoingBytes(
const TransportByteSize& /*transport_byte_size*/) override {}
void RecordAnnotation(absl::string_view annotation) override {
annotation_logger_->push_back(std::string(annotation));
}
@ -180,6 +184,10 @@ class FakeServerCallTracer : public ServerCallTracer {
void RecordReceivedTrailingMetadata(
grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
void RecordEnd(const grpc_call_final_info* /*final_info*/) override {}
void RecordIncomingBytes(
const TransportByteSize& /*transport_byte_size*/) override {}
void RecordOutgoingBytes(
const TransportByteSize& /*transport_byte_size*/) override {}
void RecordAnnotation(absl::string_view annotation) override {
annotation_logger_->push_back(std::string(annotation));
}

@ -148,6 +148,39 @@ static void CrashOnAppendError(absl::string_view, const grpc_core::Slice&) {
abort();
}
namespace grpc_core {
class FakeCallTracer final : public CallTracerInterface {
public:
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override {}
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override {}
void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) override {}
void RecordSendMessage(const SliceBuffer& send_message) override {}
void RecordSendCompressedMessage(
const SliceBuffer& send_compressed_message) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) override {}
void RecordReceivedMessage(const SliceBuffer& recv_message) override {}
void RecordReceivedDecompressedMessage(
const SliceBuffer& recv_decompressed_message) override {}
void RecordCancel(grpc_error_handle cancel_error) override {}
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr;
}
void RecordAnnotation(absl::string_view annotation) override {}
void RecordAnnotation(const Annotation& annotation) override {}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
};
} // namespace grpc_core
grpc_slice EncodeHeaderIntoBytes(
bool is_eof,
const std::vector<std::pair<std::string, std::string>>& header_fields) {
@ -161,14 +194,13 @@ grpc_slice EncodeHeaderIntoBytes(
CrashOnAppendError);
}
grpc_transport_one_way_stats stats = {};
grpc_core::FakeCallTracer call_tracer;
grpc_core::HPackCompressor::EncodeHeaderOptions hopt{
0xdeadbeef, // stream_id
is_eof, // is_eof
false, // use_true_binary_metadata
16384, // max_frame_size
&stats // stats
};
&call_tracer};
grpc_slice_buffer output;
grpc_slice_buffer_init(&output);
@ -306,14 +338,13 @@ static void verify_continuation_headers(const char* key, const char* value,
b.Append(key, grpc_core::Slice::FromStaticString(value), CrashOnAppendError);
grpc_slice_buffer_init(&output);
grpc_transport_one_way_stats stats;
stats = {};
grpc_core::FakeCallTracer call_tracer;
grpc_core::HPackCompressor::EncodeHeaderOptions hopt = {
0xdeadbeef, // stream_id
is_eof, // is_eof
false, // use_true_binary_metadata
150, // max_frame_size
&stats /* stats */};
&call_tracer};
g_compressor->EncodeHeaders(hopt, b, &output);
verify_frames(output, is_eof);
grpc_slice_buffer_destroy(&output);
@ -344,8 +375,7 @@ TEST(HpackEncoderTest, EncodeBinaryAsBase64) {
"Base64, a tool\nTo encode binary data into "
"text\nSo it can be shared."),
CrashOnAppendError);
grpc_transport_one_way_stats stats;
stats = {};
grpc_core::FakeCallTracer call_tracer;
grpc_slice_buffer output;
grpc_slice_buffer_init(&output);
grpc_core::HPackCompressor::EncodeHeaderOptions hopt = {
@ -353,7 +383,7 @@ TEST(HpackEncoderTest, EncodeBinaryAsBase64) {
true, // is_eof
false, // use_true_binary_metadata
150, // max_frame_size
&stats};
&call_tracer};
grpc_core::HPackCompressor compressor;
compressor.EncodeHeaders(hopt, b, &output);
grpc_slice_buffer_destroy(&output);
@ -369,8 +399,7 @@ TEST(HpackEncoderTest, EncodeBinaryAsTrueBinary) {
"Base64, a tool\nTo encode binary data into "
"text\nSo it can be shared."),
CrashOnAppendError);
grpc_transport_one_way_stats stats;
stats = {};
grpc_core::FakeCallTracer call_tracer;
grpc_slice_buffer output;
grpc_slice_buffer_init(&output);
grpc_core::HPackCompressor::EncodeHeaderOptions hopt = {
@ -378,7 +407,7 @@ TEST(HpackEncoderTest, EncodeBinaryAsTrueBinary) {
true, // is_eof
true, // use_true_binary_metadata
150, // max_frame_size
&stats};
&call_tracer};
grpc_core::HPackCompressor compressor;
compressor.EncodeHeaders(hopt, b, &output);
grpc_slice_buffer_destroy(&output);

@ -828,8 +828,11 @@ TEST_F(StatsPluginEnd2EndTest, TestMetadataSizeAnnotations) {
traces_recorder_->StopRecording();
auto recorded_spans = traces_recorder_->GetAndClearSpans();
// Check presence of metadata size annotations in client span.
auto sent_span_data =
GetSpanByName(recorded_spans, absl::StrCat("Sent.", client_method_name_));
auto sent_span_data = GetSpanByName(
recorded_spans,
absl::StrCat(
grpc_core::IsCallTracerInTransportEnabled() ? "Attempt." : "Sent.",
client_method_name_));
ASSERT_NE(sent_span_data, recorded_spans.end());
EXPECT_TRUE(IsAnnotationPresent(
sent_span_data,
@ -871,8 +874,11 @@ TEST_F(StatsPluginEnd2EndTest, TestHttpAnnotations) {
::opencensus::trace::exporter::SpanExporterTestPeer::ExportForTesting();
traces_recorder_->StopRecording();
auto recorded_spans = traces_recorder_->GetAndClearSpans();
auto client_span_data =
GetSpanByName(recorded_spans, absl::StrCat("Sent.", client_method_name_));
auto client_span_data = GetSpanByName(
recorded_spans,
absl::StrCat(
grpc_core::IsCallTracerInTransportEnabled() ? "Attempt." : "Sent.",
client_method_name_));
ASSERT_NE(client_span_data, recorded_spans.end());
EXPECT_TRUE(IsAnnotationPresent(client_span_data,
"HttpAnnotation type: Start time: .* "

@ -54,6 +54,39 @@ static grpc_slice MakeSlice(const std::vector<uint8_t>& bytes) {
return s;
}
namespace grpc_core {
class FakeCallTracer final : public CallTracerInterface {
public:
void RecordIncomingBytes(
const TransportByteSize& transport_byte_size) override {}
void RecordOutgoingBytes(
const TransportByteSize& transport_byte_size) override {}
void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) override {}
void RecordSendMessage(const SliceBuffer& send_message) override {}
void RecordSendCompressedMessage(
const SliceBuffer& send_compressed_message) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) override {}
void RecordReceivedMessage(const SliceBuffer& recv_message) override {}
void RecordReceivedDecompressedMessage(
const SliceBuffer& recv_decompressed_message) override {}
void RecordCancel(grpc_error_handle cancel_error) override {}
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr;
}
void RecordAnnotation(absl::string_view annotation) override {}
void RecordAnnotation(const Annotation& annotation) override {}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
};
} // namespace grpc_core
////////////////////////////////////////////////////////////////////////////////
// HPACK encoder
//
@ -80,8 +113,7 @@ static void BM_HpackEncoderEncodeDeadline(benchmark::State& state) {
saved_now + grpc_core::Duration::Seconds(30));
grpc_core::HPackCompressor c;
grpc_transport_one_way_stats stats;
stats = {};
grpc_core::FakeCallTracer call_tracer;
grpc_slice_buffer outbuf;
grpc_slice_buffer_init(&outbuf);
while (state.KeepRunning()) {
@ -91,7 +123,7 @@ static void BM_HpackEncoderEncodeDeadline(benchmark::State& state) {
true,
false,
size_t{1024},
&stats,
&call_tracer,
},
b, &outbuf);
grpc_slice_buffer_reset_and_unref(&outbuf);
@ -110,8 +142,7 @@ static void BM_HpackEncoderEncodeHeader(benchmark::State& state) {
Fixture::Prepare(&b);
grpc_core::HPackCompressor c;
grpc_transport_one_way_stats stats;
stats = {};
grpc_core::FakeCallTracer call_tracer;
grpc_slice_buffer outbuf;
grpc_slice_buffer_init(&outbuf);
while (state.KeepRunning()) {
@ -122,7 +153,7 @@ static void BM_HpackEncoderEncodeHeader(benchmark::State& state) {
state.range(0) != 0,
Fixture::kEnableTrueBinary,
static_cast<size_t>(state.range(1) + kEnsureMaxFrameAtLeast),
&stats,
&call_tracer,
},
b, &outbuf);
if (!logged_representative_output && state.iterations() > 3) {
@ -383,9 +414,8 @@ class FromEncoderFixture {
EncoderFixture::Prepare(&b);
grpc_core::HPackCompressor c;
grpc_transport_one_way_stats stats;
grpc_core::FakeCallTracer call_tracer;
std::vector<grpc_slice> out;
stats = {};
bool done = false;
int i = 0;
while (!done) {
@ -397,7 +427,7 @@ class FromEncoderFixture {
false,
EncoderFixture::kEnableTrueBinary,
1024 * 1024,
&stats,
&call_tracer,
},
b, &outbuf);
if (i == iteration) {

Loading…
Cancel
Save