OpenCensus: Use new CallTracer interfaces (#32618)

This change mostly aims to get OpenCensus to use the new
ServerCallTracer interface. Note that the interfaces nor the code are in
their final states. There are a bunch of moving pieces, but I thought
this might be a nice mid-step to check-in and make sure that our
internal traces can also work with these changes.

Overall changes -
1) call_tracer.h shows what the hierarchy of new call tracer interfaces
looks like. Open to renaming suggestions.
2) Moved most of the common interface between `CallAttemptTracer` and
`ServerCallTracer` into a common `CallTracerInterface`. We should be
able to eventually move `RecordReceivedTrailingMetadata` and `RecordEnd`
as well to these common interfaces, but it requires some additional
work.
3) The compression filter is now responsible for recording the recv and
send messages for both the subchannel call and the server, and adds in
ability to record compressed and decompressed messages as well.
4) The OpenCensus server filter now uses the new `ServerCallTracer`
interface, and so doesn't need to be a filter anymore.
5) A new ServerCallTracerFilter was added. Ideally, we should be able to
move it to the current connected filter, but it is in a bit of an
interesting state right now, so I would prefer making those changes in a
separate PR with Craig's eyes on it.
6) A new context element `GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE`
was created that replaces the old `GRPC_CONTEXT_CALL_TRACER`, and the
new `GRPC_CONTEXT_CALL_TRACER` is mainly to pass the `CallAttemptTracer`
down the stack. This should go away in the new promise-based world.



<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->

<!-- Reviewable:start -->
- - -
This change is [<img src="https://reviewable.io/review_button.svg"
height="34" align="absmiddle"
alt="Reviewable"/>](https://reviewable.io/reviews/grpc/grpc/32618)
<!-- Reviewable:end -->
pull/32359/head^2
Yash Tibrewal 2 years ago committed by GitHub
parent d025d50b54
commit 5029af9578
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      BUILD
  2. 12
      CMakeLists.txt
  3. 6
      Makefile
  4. 20
      build_autogenerated.yaml
  5. 3
      config.m4
  6. 3
      config.w32
  7. 4
      gRPC-C++.podspec
  8. 7
      gRPC-Core.podspec
  9. 5
      grpc.gemspec
  10. 9
      grpc.gyp
  11. 5
      package.xml
  12. 61
      src/core/ext/filters/client_channel/client_channel.cc
  13. 13
      src/core/ext/filters/client_channel/client_channel.h
  14. 19
      src/core/ext/filters/http/message_compress/compression_filter.cc
  15. 4
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  16. 3
      src/core/ext/xds/xds_channel_stack_modifier.cc
  17. 2
      src/core/lib/channel/call_tracer.cc
  18. 130
      src/core/lib/channel/call_tracer.h
  19. 9
      src/core/lib/channel/context.h
  20. 97
      src/core/lib/channel/server_call_tracer.h
  21. 110
      src/core/lib/channel/server_call_tracer_filter.cc
  22. 2
      src/core/lib/surface/builtins.cc
  23. 54
      src/core/lib/surface/call.cc
  24. 4
      src/core/plugin_registry/grpc_plugin_registry.cc
  25. 8
      src/cpp/ext/filters/census/client_filter.cc
  26. 12
      src/cpp/ext/filters/census/grpc_plugin.cc
  27. 9
      src/cpp/ext/filters/census/open_census_call_tracer.h
  28. 152
      src/cpp/ext/filters/census/server_call_tracer.cc
  29. 40
      src/cpp/ext/filters/census/server_call_tracer.h
  30. 54
      src/cpp/ext/filters/census/server_filter.h
  31. 3
      src/python/grpcio/grpc_core_dependencies.py
  32. 21
      test/core/channel/minimal_stack_is_minimal_test.cc
  33. 5
      test/core/channel/server_call_tracer_factory_test.cc
  34. 39
      test/core/xds/xds_channel_stack_modifier_test.cc
  35. 5
      tools/doxygen/Doxyfile.c++.internal
  36. 5
      tools/doxygen/Doxyfile.core.internal

13
BUILD

@ -1216,8 +1216,9 @@ grpc_cc_library(
"//src/core:lib/channel/channelz.cc",
"//src/core:lib/channel/channelz_registry.cc",
"//src/core:lib/channel/connected_channel.cc",
"//src/core:lib/channel/server_call_tracer_filter.cc",
"//src/core:lib/channel/promise_based_filter.cc",
"//src/core:lib/channel/server_call_tracer.cc",
"//src/core:lib/channel/call_tracer.cc",
"//src/core:lib/channel/status_util.cc",
"//src/core:lib/compression/compression.cc",
"//src/core:lib/compression/compression_internal.cc",
@ -1328,7 +1329,6 @@ grpc_cc_library(
"//src/core:lib/channel/connected_channel.h",
"//src/core:lib/channel/context.h",
"//src/core:lib/channel/promise_based_filter.h",
"//src/core:lib/channel/server_call_tracer.h",
"//src/core:lib/channel/status_util.h",
"//src/core:lib/compression/compression_internal.h",
"//src/core:lib/compression/message_compress.h",
@ -1463,6 +1463,7 @@ grpc_cc_library(
"//src/core:basic_join",
"//src/core:basic_seq",
"//src/core:bitset",
"//src/core:cancel_callback",
"//src/core:channel_args",
"//src/core:channel_args_endpoint_config",
"//src/core:channel_args_preconditioning",
@ -2190,7 +2191,7 @@ grpc_cc_library(
"src/cpp/ext/filters/census/grpc_plugin.cc",
"src/cpp/ext/filters/census/measures.cc",
"src/cpp/ext/filters/census/rpc_encoding.cc",
"src/cpp/ext/filters/census/server_filter.cc",
"src/cpp/ext/filters/census/server_call_tracer.cc",
"src/cpp/ext/filters/census/views.cc",
],
hdrs = [
@ -2201,7 +2202,7 @@ grpc_cc_library(
"src/cpp/ext/filters/census/measures.h",
"src/cpp/ext/filters/census/open_census_call_tracer.h",
"src/cpp/ext/filters/census/rpc_encoding.h",
"src/cpp/ext/filters/census/server_filter.h",
"src/cpp/ext/filters/census/server_call_tracer.h",
],
external_deps = [
"absl/base",
@ -2231,16 +2232,12 @@ grpc_cc_library(
"grpc_public_hdrs",
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:cancel_callback",
"//src/core:channel_args",
"//src/core:channel_fwd",
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:error",
"//src/core:experiments",
"//src/core:map",
"//src/core:pipe",
"//src/core:poll",
"//src/core:slice",
"//src/core:slice_buffer",
"//src/core:slice_refcount",

12
CMakeLists.txt generated

@ -2155,6 +2155,7 @@ add_library(grpc
src/core/lib/address_utils/parse_address.cc
src/core/lib/address_utils/sockaddr_utils.cc
src/core/lib/backoff/backoff.cc
src/core/lib/channel/call_tracer.cc
src/core/lib/channel/channel_args.cc
src/core/lib/channel/channel_args_preconditioning.cc
src/core/lib/channel/channel_stack.cc
@ -2165,7 +2166,7 @@ add_library(grpc
src/core/lib/channel/channelz_registry.cc
src/core/lib/channel/connected_channel.cc
src/core/lib/channel/promise_based_filter.cc
src/core/lib/channel/server_call_tracer.cc
src/core/lib/channel/server_call_tracer_filter.cc
src/core/lib/channel/status_util.cc
src/core/lib/compression/compression.cc
src/core/lib/compression/compression_internal.cc
@ -2844,6 +2845,7 @@ add_library(grpc_unsecure
src/core/lib/address_utils/parse_address.cc
src/core/lib/address_utils/sockaddr_utils.cc
src/core/lib/backoff/backoff.cc
src/core/lib/channel/call_tracer.cc
src/core/lib/channel/channel_args.cc
src/core/lib/channel/channel_args_preconditioning.cc
src/core/lib/channel/channel_stack.cc
@ -2854,7 +2856,7 @@ add_library(grpc_unsecure
src/core/lib/channel/channelz_registry.cc
src/core/lib/channel/connected_channel.cc
src/core/lib/channel/promise_based_filter.cc
src/core/lib/channel/server_call_tracer.cc
src/core/lib/channel/server_call_tracer_filter.cc
src/core/lib/channel/status_util.cc
src/core/lib/compression/compression.cc
src/core/lib/compression/compression_internal.cc
@ -4363,6 +4365,7 @@ add_library(grpc_authorization_provider
src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c
src/core/lib/address_utils/parse_address.cc
src/core/lib/address_utils/sockaddr_utils.cc
src/core/lib/channel/call_tracer.cc
src/core/lib/channel/channel_args.cc
src/core/lib/channel/channel_args_preconditioning.cc
src/core/lib/channel/channel_stack.cc
@ -4373,7 +4376,7 @@ add_library(grpc_authorization_provider
src/core/lib/channel/channelz_registry.cc
src/core/lib/channel/connected_channel.cc
src/core/lib/channel/promise_based_filter.cc
src/core/lib/channel/server_call_tracer.cc
src/core/lib/channel/server_call_tracer_filter.cc
src/core/lib/channel/status_util.cc
src/core/lib/compression/compression.cc
src/core/lib/compression/compression_internal.cc
@ -11424,6 +11427,7 @@ add_executable(frame_test
src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c
src/core/lib/address_utils/parse_address.cc
src/core/lib/address_utils/sockaddr_utils.cc
src/core/lib/channel/call_tracer.cc
src/core/lib/channel/channel_args.cc
src/core/lib/channel/channel_args_preconditioning.cc
src/core/lib/channel/channel_stack.cc
@ -11434,7 +11438,7 @@ add_executable(frame_test
src/core/lib/channel/channelz_registry.cc
src/core/lib/channel/connected_channel.cc
src/core/lib/channel/promise_based_filter.cc
src/core/lib/channel/server_call_tracer.cc
src/core/lib/channel/server_call_tracer_filter.cc
src/core/lib/channel/status_util.cc
src/core/lib/compression/compression.cc
src/core/lib/compression/compression_internal.cc

6
Makefile generated

@ -1404,6 +1404,7 @@ LIBGRPC_SRC = \
src/core/lib/address_utils/parse_address.cc \
src/core/lib/address_utils/sockaddr_utils.cc \
src/core/lib/backoff/backoff.cc \
src/core/lib/channel/call_tracer.cc \
src/core/lib/channel/channel_args.cc \
src/core/lib/channel/channel_args_preconditioning.cc \
src/core/lib/channel/channel_stack.cc \
@ -1414,7 +1415,7 @@ LIBGRPC_SRC = \
src/core/lib/channel/channelz_registry.cc \
src/core/lib/channel/connected_channel.cc \
src/core/lib/channel/promise_based_filter.cc \
src/core/lib/channel/server_call_tracer.cc \
src/core/lib/channel/server_call_tracer_filter.cc \
src/core/lib/channel/status_util.cc \
src/core/lib/compression/compression.cc \
src/core/lib/compression/compression_internal.cc \
@ -1946,6 +1947,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/address_utils/parse_address.cc \
src/core/lib/address_utils/sockaddr_utils.cc \
src/core/lib/backoff/backoff.cc \
src/core/lib/channel/call_tracer.cc \
src/core/lib/channel/channel_args.cc \
src/core/lib/channel/channel_args_preconditioning.cc \
src/core/lib/channel/channel_stack.cc \
@ -1956,7 +1958,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/channel/channelz_registry.cc \
src/core/lib/channel/connected_channel.cc \
src/core/lib/channel/promise_based_filter.cc \
src/core/lib/channel/server_call_tracer.cc \
src/core/lib/channel/server_call_tracer_filter.cc \
src/core/lib/channel/status_util.cc \
src/core/lib/compression/compression.cc \
src/core/lib/compression/compression_internal.cc \

@ -781,7 +781,6 @@ libs:
- src/core/lib/channel/connected_channel.h
- src/core/lib/channel/context.h
- src/core/lib/channel/promise_based_filter.h
- src/core/lib/channel/server_call_tracer.h
- src/core/lib/channel/status_util.h
- src/core/lib/compression/compression_internal.h
- src/core/lib/compression/message_compress.h
@ -942,6 +941,7 @@ libs:
- src/core/lib/matchers/matchers.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
@ -1553,6 +1553,7 @@ libs:
- src/core/lib/address_utils/parse_address.cc
- src/core/lib/address_utils/sockaddr_utils.cc
- src/core/lib/backoff/backoff.cc
- src/core/lib/channel/call_tracer.cc
- src/core/lib/channel/channel_args.cc
- src/core/lib/channel/channel_args_preconditioning.cc
- src/core/lib/channel/channel_stack.cc
@ -1563,7 +1564,7 @@ libs:
- src/core/lib/channel/channelz_registry.cc
- src/core/lib/channel/connected_channel.cc
- src/core/lib/channel/promise_based_filter.cc
- src/core/lib/channel/server_call_tracer.cc
- src/core/lib/channel/server_call_tracer_filter.cc
- src/core/lib/channel/status_util.cc
- src/core/lib/compression/compression.cc
- src/core/lib/compression/compression_internal.cc
@ -2123,7 +2124,6 @@ libs:
- src/core/lib/channel/connected_channel.h
- src/core/lib/channel/context.h
- src/core/lib/channel/promise_based_filter.h
- src/core/lib/channel/server_call_tracer.h
- src/core/lib/channel/status_util.h
- src/core/lib/compression/compression_internal.h
- src/core/lib/compression/message_compress.h
@ -2281,6 +2281,7 @@ libs:
- src/core/lib/load_balancing/subchannel_interface.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
@ -2508,6 +2509,7 @@ libs:
- src/core/lib/address_utils/parse_address.cc
- src/core/lib/address_utils/sockaddr_utils.cc
- src/core/lib/backoff/backoff.cc
- src/core/lib/channel/call_tracer.cc
- src/core/lib/channel/channel_args.cc
- src/core/lib/channel/channel_args_preconditioning.cc
- src/core/lib/channel/channel_stack.cc
@ -2518,7 +2520,7 @@ libs:
- src/core/lib/channel/channelz_registry.cc
- src/core/lib/channel/connected_channel.cc
- src/core/lib/channel/promise_based_filter.cc
- src/core/lib/channel/server_call_tracer.cc
- src/core/lib/channel/server_call_tracer_filter.cc
- src/core/lib/channel/status_util.cc
- src/core/lib/compression/compression.cc
- src/core/lib/compression/compression_internal.cc
@ -3586,7 +3588,6 @@ libs:
- src/core/lib/channel/connected_channel.h
- src/core/lib/channel/context.h
- src/core/lib/channel/promise_based_filter.h
- src/core/lib/channel/server_call_tracer.h
- src/core/lib/channel/status_util.h
- src/core/lib/compression/compression_internal.h
- src/core/lib/compression/message_compress.h
@ -3738,6 +3739,7 @@ libs:
- src/core/lib/matchers/matchers.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
@ -3850,6 +3852,7 @@ libs:
- src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c
- src/core/lib/address_utils/parse_address.cc
- src/core/lib/address_utils/sockaddr_utils.cc
- src/core/lib/channel/call_tracer.cc
- src/core/lib/channel/channel_args.cc
- src/core/lib/channel/channel_args_preconditioning.cc
- src/core/lib/channel/channel_stack.cc
@ -3860,7 +3863,7 @@ libs:
- src/core/lib/channel/channelz_registry.cc
- src/core/lib/channel/connected_channel.cc
- src/core/lib/channel/promise_based_filter.cc
- src/core/lib/channel/server_call_tracer.cc
- src/core/lib/channel/server_call_tracer_filter.cc
- src/core/lib/channel/status_util.cc
- src/core/lib/compression/compression.cc
- src/core/lib/compression/compression_internal.cc
@ -7395,7 +7398,6 @@ targets:
- src/core/lib/channel/connected_channel.h
- src/core/lib/channel/context.h
- src/core/lib/channel/promise_based_filter.h
- src/core/lib/channel/server_call_tracer.h
- src/core/lib/channel/status_util.h
- src/core/lib/compression/compression_internal.h
- src/core/lib/compression/message_compress.h
@ -7546,6 +7548,7 @@ targets:
- src/core/lib/load_balancing/subchannel_interface.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
@ -7642,6 +7645,7 @@ targets:
- src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c
- src/core/lib/address_utils/parse_address.cc
- src/core/lib/address_utils/sockaddr_utils.cc
- src/core/lib/channel/call_tracer.cc
- src/core/lib/channel/channel_args.cc
- src/core/lib/channel/channel_args_preconditioning.cc
- src/core/lib/channel/channel_stack.cc
@ -7652,7 +7656,7 @@ targets:
- src/core/lib/channel/channelz_registry.cc
- src/core/lib/channel/connected_channel.cc
- src/core/lib/channel/promise_based_filter.cc
- src/core/lib/channel/server_call_tracer.cc
- src/core/lib/channel/server_call_tracer_filter.cc
- src/core/lib/channel/status_util.cc
- src/core/lib/compression/compression.cc
- src/core/lib/compression/compression_internal.cc

3
config.m4 generated

@ -485,6 +485,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/address_utils/parse_address.cc \
src/core/lib/address_utils/sockaddr_utils.cc \
src/core/lib/backoff/backoff.cc \
src/core/lib/channel/call_tracer.cc \
src/core/lib/channel/channel_args.cc \
src/core/lib/channel/channel_args_preconditioning.cc \
src/core/lib/channel/channel_stack.cc \
@ -495,7 +496,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/channel/channelz_registry.cc \
src/core/lib/channel/connected_channel.cc \
src/core/lib/channel/promise_based_filter.cc \
src/core/lib/channel/server_call_tracer.cc \
src/core/lib/channel/server_call_tracer_filter.cc \
src/core/lib/channel/status_util.cc \
src/core/lib/compression/compression.cc \
src/core/lib/compression/compression_internal.cc \

3
config.w32 generated

@ -451,6 +451,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\address_utils\\parse_address.cc " +
"src\\core\\lib\\address_utils\\sockaddr_utils.cc " +
"src\\core\\lib\\backoff\\backoff.cc " +
"src\\core\\lib\\channel\\call_tracer.cc " +
"src\\core\\lib\\channel\\channel_args.cc " +
"src\\core\\lib\\channel\\channel_args_preconditioning.cc " +
"src\\core\\lib\\channel\\channel_stack.cc " +
@ -461,7 +462,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\channel\\channelz_registry.cc " +
"src\\core\\lib\\channel\\connected_channel.cc " +
"src\\core\\lib\\channel\\promise_based_filter.cc " +
"src\\core\\lib\\channel\\server_call_tracer.cc " +
"src\\core\\lib\\channel\\server_call_tracer_filter.cc " +
"src\\core\\lib\\channel\\status_util.cc " +
"src\\core\\lib\\compression\\compression.cc " +
"src\\core\\lib\\compression\\compression_internal.cc " +

4
gRPC-C++.podspec generated

@ -725,7 +725,6 @@ Pod::Spec.new do |s|
'src/core/lib/channel/connected_channel.h',
'src/core/lib/channel/context.h',
'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/server_call_tracer.h',
'src/core/lib/channel/status_util.h',
'src/core/lib/compression/compression_internal.h',
'src/core/lib/compression/message_compress.h',
@ -912,6 +911,7 @@ Pod::Spec.new do |s|
'src/core/lib/matchers/matchers.h',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',
@ -1660,7 +1660,6 @@ Pod::Spec.new do |s|
'src/core/lib/channel/connected_channel.h',
'src/core/lib/channel/context.h',
'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/server_call_tracer.h',
'src/core/lib/channel/status_util.h',
'src/core/lib/compression/compression_internal.h',
'src/core/lib/compression/message_compress.h',
@ -1847,6 +1846,7 @@ Pod::Spec.new do |s|
'src/core/lib/matchers/matchers.h',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',

7
gRPC-Core.podspec generated

@ -1086,6 +1086,7 @@ Pod::Spec.new do |s|
'src/core/lib/backoff/backoff.cc',
'src/core/lib/backoff/backoff.h',
'src/core/lib/channel/call_finalization.h',
'src/core/lib/channel/call_tracer.cc',
'src/core/lib/channel/call_tracer.h',
'src/core/lib/channel/channel_args.cc',
'src/core/lib/channel/channel_args.h',
@ -1109,8 +1110,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/context.h',
'src/core/lib/channel/promise_based_filter.cc',
'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/server_call_tracer.cc',
'src/core/lib/channel/server_call_tracer.h',
'src/core/lib/channel/server_call_tracer_filter.cc',
'src/core/lib/channel/status_util.cc',
'src/core/lib/channel/status_util.h',
'src/core/lib/compression/compression.cc',
@ -1486,6 +1486,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',
@ -2348,7 +2349,6 @@ Pod::Spec.new do |s|
'src/core/lib/channel/connected_channel.h',
'src/core/lib/channel/context.h',
'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/server_call_tracer.h',
'src/core/lib/channel/status_util.h',
'src/core/lib/compression/compression_internal.h',
'src/core/lib/compression/message_compress.h',
@ -2535,6 +2535,7 @@ Pod::Spec.new do |s|
'src/core/lib/matchers/matchers.h',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',

5
grpc.gemspec generated

@ -995,6 +995,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/backoff/backoff.cc )
s.files += %w( src/core/lib/backoff/backoff.h )
s.files += %w( src/core/lib/channel/call_finalization.h )
s.files += %w( src/core/lib/channel/call_tracer.cc )
s.files += %w( src/core/lib/channel/call_tracer.h )
s.files += %w( src/core/lib/channel/channel_args.cc )
s.files += %w( src/core/lib/channel/channel_args.h )
@ -1018,8 +1019,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/channel/context.h )
s.files += %w( src/core/lib/channel/promise_based_filter.cc )
s.files += %w( src/core/lib/channel/promise_based_filter.h )
s.files += %w( src/core/lib/channel/server_call_tracer.cc )
s.files += %w( src/core/lib/channel/server_call_tracer.h )
s.files += %w( src/core/lib/channel/server_call_tracer_filter.cc )
s.files += %w( src/core/lib/channel/status_util.cc )
s.files += %w( src/core/lib/channel/status_util.h )
s.files += %w( src/core/lib/compression/compression.cc )
@ -1395,6 +1395,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/activity.cc )
s.files += %w( src/core/lib/promise/activity.h )
s.files += %w( src/core/lib/promise/arena_promise.h )
s.files += %w( src/core/lib/promise/cancel_callback.h )
s.files += %w( src/core/lib/promise/context.h )
s.files += %w( src/core/lib/promise/detail/basic_join.h )
s.files += %w( src/core/lib/promise/detail/basic_seq.h )

9
grpc.gyp generated

@ -817,6 +817,7 @@
'src/core/lib/address_utils/parse_address.cc',
'src/core/lib/address_utils/sockaddr_utils.cc',
'src/core/lib/backoff/backoff.cc',
'src/core/lib/channel/call_tracer.cc',
'src/core/lib/channel/channel_args.cc',
'src/core/lib/channel/channel_args_preconditioning.cc',
'src/core/lib/channel/channel_stack.cc',
@ -827,7 +828,7 @@
'src/core/lib/channel/channelz_registry.cc',
'src/core/lib/channel/connected_channel.cc',
'src/core/lib/channel/promise_based_filter.cc',
'src/core/lib/channel/server_call_tracer.cc',
'src/core/lib/channel/server_call_tracer_filter.cc',
'src/core/lib/channel/status_util.cc',
'src/core/lib/compression/compression.cc',
'src/core/lib/compression/compression_internal.cc',
@ -1301,6 +1302,7 @@
'src/core/lib/address_utils/parse_address.cc',
'src/core/lib/address_utils/sockaddr_utils.cc',
'src/core/lib/backoff/backoff.cc',
'src/core/lib/channel/call_tracer.cc',
'src/core/lib/channel/channel_args.cc',
'src/core/lib/channel/channel_args_preconditioning.cc',
'src/core/lib/channel/channel_stack.cc',
@ -1311,7 +1313,7 @@
'src/core/lib/channel/channelz_registry.cc',
'src/core/lib/channel/connected_channel.cc',
'src/core/lib/channel/promise_based_filter.cc',
'src/core/lib/channel/server_call_tracer.cc',
'src/core/lib/channel/server_call_tracer_filter.cc',
'src/core/lib/channel/status_util.cc',
'src/core/lib/compression/compression.cc',
'src/core/lib/compression/compression_internal.cc',
@ -1809,6 +1811,7 @@
'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c',
'src/core/lib/address_utils/parse_address.cc',
'src/core/lib/address_utils/sockaddr_utils.cc',
'src/core/lib/channel/call_tracer.cc',
'src/core/lib/channel/channel_args.cc',
'src/core/lib/channel/channel_args_preconditioning.cc',
'src/core/lib/channel/channel_stack.cc',
@ -1819,7 +1822,7 @@
'src/core/lib/channel/channelz_registry.cc',
'src/core/lib/channel/connected_channel.cc',
'src/core/lib/channel/promise_based_filter.cc',
'src/core/lib/channel/server_call_tracer.cc',
'src/core/lib/channel/server_call_tracer_filter.cc',
'src/core/lib/channel/status_util.cc',
'src/core/lib/compression/compression.cc',
'src/core/lib/compression/compression_internal.cc',

5
package.xml generated

@ -977,6 +977,7 @@
<file baseinstalldir="/" name="src/core/lib/backoff/backoff.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/backoff/backoff.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/call_finalization.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/call_tracer.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/call_tracer.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/channel_args.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/channel_args.h" role="src" />
@ -1000,8 +1001,7 @@
<file baseinstalldir="/" name="src/core/lib/channel/context.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/promise_based_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/promise_based_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/server_call_tracer.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/server_call_tracer.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/server_call_tracer_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/status_util.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/status_util.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/compression/compression.cc" role="src" />
@ -1377,6 +1377,7 @@
<file baseinstalldir="/" name="src/core/lib/promise/activity.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/activity.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/arena_promise.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/cancel_callback.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/context.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/basic_join.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/basic_seq.h" role="src" />

@ -1907,8 +1907,8 @@ absl::optional<absl::Status> ClientChannel::CallData::CheckResolution(
}
// If the call was queued, add trace annotation.
if (was_queued) {
auto* call_tracer = static_cast<CallTracer*>(
call_context()[GRPC_CONTEXT_CALL_TRACER].value);
auto* call_tracer = static_cast<CallTracerAnnotationInterface*>(
call_context()[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
if (call_tracer != nullptr) {
call_tracer->RecordAnnotation("Delayed name resolution complete.");
}
@ -2504,12 +2504,14 @@ class ClientChannel::LoadBalancedCall::BackendMetricAccessor
namespace {
CallTracer::CallAttemptTracer* GetCallAttemptTracer(
ClientCallTracer::CallAttemptTracer* CreateCallAttemptTracer(
grpc_call_context_element* context, bool is_transparent_retry) {
auto* call_tracer =
static_cast<CallTracer*>(context[GRPC_CONTEXT_CALL_TRACER].value);
auto* call_tracer = static_cast<ClientCallTracer*>(
context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
if (call_tracer == nullptr) return nullptr;
return call_tracer->StartNewAttempt(is_transparent_retry);
auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
return tracer;
}
} // namespace
@ -2523,9 +2525,8 @@ ClientChannel::LoadBalancedCall::LoadBalancedCall(
? "LoadBalancedCall"
: nullptr),
chand_(chand),
call_dispatch_controller_(call_dispatch_controller),
call_attempt_tracer_(
GetCallAttemptTracer(call_context, is_transparent_retry)) {
call_dispatch_controller_(call_dispatch_controller) {
CreateCallAttemptTracer(call_context, is_transparent_retry);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: created", chand_, this);
}
@ -2539,10 +2540,10 @@ ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
void ClientChannel::LoadBalancedCall::Orphan() {
// Compute latency and report it to the tracer.
if (call_attempt_tracer_ != nullptr) {
if (call_attempt_tracer() != nullptr) {
gpr_timespec latency =
gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
call_attempt_tracer_->RecordEnd(latency);
call_attempt_tracer()->RecordEnd(latency);
}
Unref();
}
@ -2552,8 +2553,8 @@ void ClientChannel::LoadBalancedCall::RecordCallCompletion(
grpc_transport_stream_stats* transport_stream_stats,
absl::string_view peer_address) {
// If we have a tracer, notify it.
if (call_attempt_tracer_ != nullptr) {
call_attempt_tracer_->RecordReceivedTrailingMetadata(
if (call_attempt_tracer() != nullptr) {
call_attempt_tracer()->RecordReceivedTrailingMetadata(
status, recv_trailing_metadata, transport_stream_stats);
}
// If the LB policy requested a callback for trailing metadata, invoke
@ -2647,8 +2648,8 @@ absl::optional<absl::Status> ClientChannel::LoadBalancedCall::PickSubchannel(
}
// Pick is complete.
// If it was queued, add a trace annotation.
if (was_queued && call_attempt_tracer_ != nullptr) {
call_attempt_tracer_->RecordAnnotation("Delayed LB pick complete.");
if (was_queued && call_attempt_tracer() != nullptr) {
call_attempt_tracer()->RecordAnnotation("Delayed LB pick complete.");
}
// If the pick failed, fail the call.
if (!error.ok()) {
@ -2915,7 +2916,7 @@ void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch(
GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: batch started from above: %s, "
"call_attempt_tracer_=%p",
"call_attempt_tracer()=%p",
chand(), this,
grpc_transport_stream_op_batch_string(batch, false).c_str(),
call_attempt_tracer());
@ -2931,10 +2932,6 @@ void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch(
call_attempt_tracer()->RecordSendInitialMetadata(
batch->payload->send_initial_metadata.send_initial_metadata);
}
if (batch->send_message) {
call_attempt_tracer()->RecordSendMessage(
*batch->payload->send_message.send_message);
}
if (batch->send_trailing_metadata) {
call_attempt_tracer()->RecordSendTrailingMetadata(
batch->payload->send_trailing_metadata.send_trailing_metadata);
@ -2950,13 +2947,6 @@ void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch(
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&recv_initial_metadata_ready_;
}
if (batch->recv_message) {
recv_message_ = batch->payload->recv_message.recv_message;
original_recv_message_ready_ =
batch->payload->recv_message.recv_message_ready;
GRPC_CLOSURE_INIT(&recv_message_ready_, RecvMessageReady, this, nullptr);
batch->payload->recv_message.recv_message_ready = &recv_message_ready_;
}
}
// Intercept recv_trailing_metadata even if there is no call tracer,
// since we may need to notify the LB policy about trailing metadata.
@ -3046,32 +3036,19 @@ void ClientChannel::FilterBasedLoadBalancedCall::RecvInitialMetadataReady(
if (error.ok()) {
// recv_initial_metadata_flags is not populated for clients
self->call_attempt_tracer()->RecordReceivedInitialMetadata(
self->recv_initial_metadata_, 0 /* recv_initial_metadata_flags */);
self->recv_initial_metadata_);
}
Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
error);
}
void ClientChannel::FilterBasedLoadBalancedCall::RecvMessageReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: got recv_message_ready: error=%s",
self->chand(), self, StatusToString(error).c_str());
}
if (self->recv_message_->has_value()) {
self->call_attempt_tracer()->RecordReceivedMessage(**self->recv_message_);
}
Closure::Run(DEBUG_LOCATION, self->original_recv_message_ready_, error);
}
void ClientChannel::FilterBasedLoadBalancedCall::RecvTrailingMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: got recv_trailing_metadata_ready: error=%s "
"call_attempt_tracer_=%p lb_subchannel_call_tracker_=%p "
"call_attempt_tracer()=%p lb_subchannel_call_tracker_=%p "
"failure_error_=%s",
self->chand(), self, StatusToString(error).c_str(),
self->call_attempt_tracer(), self->lb_subchannel_call_tracker(),

@ -63,7 +63,6 @@
#include "src/core/lib/resolver/resolver.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata_batch.h"
@ -388,8 +387,9 @@ class ClientChannel::LoadBalancedCall
ConfigSelector::CallDispatchController* call_dispatch_controller() const {
return call_dispatch_controller_;
}
CallTracer::CallAttemptTracer* call_attempt_tracer() const {
return call_attempt_tracer_;
ClientCallTracer::CallAttemptTracer* call_attempt_tracer() const {
return static_cast<ClientCallTracer::CallAttemptTracer*>(
call_context()[GRPC_CONTEXT_CALL_TRACER].value);
}
gpr_cycle_counter lb_call_start_time() const { return lb_call_start_time_; }
ConnectedSubchannel* connected_subchannel() const {
@ -442,7 +442,6 @@ class ClientChannel::LoadBalancedCall
ClientChannel* chand_;
ConfigSelector::CallDispatchController* call_dispatch_controller_;
CallTracer::CallAttemptTracer* call_attempt_tracer_;
gpr_cycle_counter lb_call_start_time_ = gpr_get_cycle_counter();
@ -525,7 +524,6 @@ class ClientChannel::FilterBasedLoadBalancedCall
static void SendInitialMetadataOnComplete(void* arg, grpc_error_handle error);
static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
static void RecvMessageReady(void* arg, grpc_error_handle error);
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
// Called to perform a pick, both when the call is initially started
@ -568,11 +566,6 @@ class ClientChannel::FilterBasedLoadBalancedCall
grpc_closure recv_initial_metadata_ready_;
grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
// For intercepting recv_message_ready.
absl::optional<SliceBuffer>* recv_message_ = nullptr;
grpc_closure recv_message_ready_;
grpc_closure* original_recv_message_ready_ = nullptr;
// For intercepting recv_trailing_metadata_ready.
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
grpc_transport_stream_stats* transport_stream_stats_ = nullptr;

@ -36,6 +36,7 @@
#include <grpc/support/log.h>
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
@ -113,6 +114,12 @@ MessageHandle CompressionFilter::CompressMessage(
gpr_log(GPR_ERROR, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d",
message->payload()->Length(), algorithm, message->flags());
}
auto* call_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<CallTracerInterface*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer != nullptr) {
call_tracer->RecordSendMessage(*message->payload());
}
// Check if we're allowed to compress this message
// (apps might want to disable compression for certain messages to avoid
// crime/beast like vulns).
@ -143,6 +150,9 @@ MessageHandle CompressionFilter::CompressMessage(
}
tmp.Swap(payload);
flags |= GRPC_WRITE_INTERNAL_COMPRESS;
if (call_tracer != nullptr) {
call_tracer->RecordSendCompressedMessage(*message->payload());
}
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
const char* algo_name;
@ -163,6 +173,12 @@ absl::StatusOr<MessageHandle> CompressionFilter::DecompressMessage(
message->payload()->Length(),
args.max_recv_message_length.value_or(-1), args.algorithm);
}
auto* call_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<CallTracerInterface*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer != nullptr) {
call_tracer->RecordReceivedMessage(*message->payload());
}
// Check max message length.
if (args.max_recv_message_length.has_value() &&
message->payload()->Length() >
@ -189,6 +205,9 @@ absl::StatusOr<MessageHandle> CompressionFilter::DecompressMessage(
message->payload()->Swap(&decompressed_slices);
message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS;
message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED;
if (call_tracer != nullptr) {
call_tracer->RecordReceivedDecompressedMessage(*message->payload());
}
return std::move(message);
}

@ -211,9 +211,9 @@ void MaybeRecordTransportAnnotation(grpc_chttp2_stream* s,
if (!grpc_core::IsTraceRecordCallopsEnabled()) {
return;
}
grpc_core::CallTracer* call_tracer = static_cast<grpc_core::CallTracer*>(
auto* call_tracer = static_cast<grpc_core::CallTracerInterface*>(
static_cast<grpc_call_context_element*>(
s->context)[GRPC_CONTEXT_CALL_TRACER]
s->context)[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value);
if (!call_tracer) {
return;

@ -62,8 +62,7 @@ bool XdsChannelStackModifier::ModifyChannelStack(ChannelStackBuilder* builder) {
auto it = builder->mutable_stack()->begin();
while (it != builder->mutable_stack()->end()) {
const char* filter_name_at_it = (*it)->name;
if (strcmp("census_server", filter_name_at_it) == 0 ||
strcmp("opencensus_server", filter_name_at_it) == 0) {
if (strcmp("census_server", filter_name_at_it) == 0) {
break;
}
++it;

@ -18,7 +18,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/server_call_tracer.h"
#include "src/core/lib/channel/call_tracer.h"
namespace grpc_core {

@ -21,77 +21,141 @@
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
// The interface hierarchy is as follows -
// CallTracerAnnotationInterface
// / \
// ClientCallTracer CallTracerInterface
// / \
// CallAttemptTracer ServerCallTracer
// The base class for all tracer implementations.
class CallTracerAnnotationInterface {
public:
virtual ~CallTracerAnnotationInterface() {}
// Records an annotation on the call attempt.
// TODO(yashykt): If needed, extend this to attach attributes with
// annotations.
virtual void RecordAnnotation(absl::string_view annotation) = 0;
};
// The base class for CallAttemptTracer and ServerCallTracer.
// TODO(yashykt): What's a better name for this?
class CallTracerInterface : public CallTracerAnnotationInterface {
public:
~CallTracerInterface() override {}
// Please refer to `grpc_transport_stream_op_batch_payload` for details on
// arguments.
virtual void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) = 0;
virtual void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) = 0;
virtual void RecordSendMessage(const SliceBuffer& send_message) = 0;
// Only invoked if message was actually compressed.
virtual void RecordSendCompressedMessage(
const SliceBuffer& send_compressed_message) = 0;
// The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()`
// methods should only be invoked when the metadata/message was
// successfully received, i.e., without any error.
virtual void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) = 0;
virtual void RecordReceivedMessage(const SliceBuffer& recv_message) = 0;
// Only invoked if message was actually decompressed.
virtual void RecordReceivedDecompressedMessage(
const SliceBuffer& recv_decompressed_message) = 0;
virtual void RecordCancel(grpc_error_handle cancel_error) = 0;
};
// Interface for a tracer that records activities on a call. Actual attempts for
// this call are traced with CallAttemptTracer after invoking RecordNewAttempt()
// on the CallTracer object.
class CallTracer {
// on the ClientCallTracer object.
class ClientCallTracer : public CallTracerAnnotationInterface {
public:
// Interface for a tracer that records activities on a particular call
// attempt.
// (A single RPC can have multiple attempts due to retry/hedging policies or
// as transparent retry attempts.)
class CallAttemptTracer {
class CallAttemptTracer : public CallTracerInterface {
public:
virtual ~CallAttemptTracer() {}
// Please refer to `grpc_transport_stream_op_batch_payload` for details on
// arguments.
virtual void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) = 0;
virtual void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) = 0;
virtual void RecordSendMessage(const SliceBuffer& send_message) = 0;
// The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()`
// methods should only be invoked when the metadata/message was
// successfully received, i.e., without any error.
virtual void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata, uint32_t flags) = 0;
virtual void RecordReceivedMessage(const SliceBuffer& recv_message) = 0;
~CallAttemptTracer() override {}
// TODO(yashykt): The following two methods `RecordReceivedTrailingMetadata`
// and `RecordEnd` should be moved into CallTracerInterface.
// If the call was cancelled before the recv_trailing_metadata op
// was started, recv_trailing_metadata and transport_stream_stats
// will be null.
virtual void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) = 0;
virtual void RecordCancel(grpc_error_handle cancel_error) = 0;
// Should be the last API call to the object. Once invoked, the tracer
// library is free to destroy the object.
virtual void RecordEnd(const gpr_timespec& latency) = 0;
// Records an annotation on the call attempt.
// TODO(yashykt): If needed, extend this to attach attributes with
// annotations.
virtual void RecordAnnotation(absl::string_view annotation) = 0;
};
virtual ~CallTracer() {}
~ClientCallTracer() override {}
// Records a new attempt for the associated call. \a transparent denotes
// whether the attempt is being made as a transparent retry or as a
// non-transparent retry/heding attempt. (There will be at least one attempt
// even if the call is not being retried.) The `CallTracer` object retains
// ownership to the newly created `CallAttemptTracer` object. RecordEnd()
// serves as an indication that the call stack is done with all API calls, and
// the tracer library is free to destroy it after that.
// even if the call is not being retried.) The `ClientCallTracer` object
// retains ownership to the newly created `CallAttemptTracer` object.
// RecordEnd() serves as an indication that the call stack is done with all
// API calls, and the tracer library is free to destroy it after that.
virtual CallAttemptTracer* StartNewAttempt(bool is_transparent_retry) = 0;
// Records an annotation on the call attempt.
// TODO(yashykt): If needed, extend this to attach attributes with
// annotations.
virtual void RecordAnnotation(absl::string_view annotation) = 0;
};
// Interface for a tracer that records activities on a server call.
class ServerCallTracer : public CallTracerInterface {
public:
~ServerCallTracer() override {}
// TODO(yashykt): The following two methods `RecordReceivedTrailingMetadata`
// and `RecordEnd` should be moved into CallTracerInterface.
virtual void RecordReceivedTrailingMetadata(
grpc_metadata_batch* recv_trailing_metadata) = 0;
// Should be the last API call to the object. Once invoked, the tracer
// library is free to destroy the object.
virtual void RecordEnd(const grpc_call_final_info* final_info) = 0;
};
// Interface for a factory that can create a ServerCallTracer object per
// server call.
class ServerCallTracerFactory {
public:
struct RawPointerChannelArgTag {};
virtual ~ServerCallTracerFactory() {}
virtual ServerCallTracer* CreateNewServerCallTracer(Arena* arena) = 0;
// Use this method to get the server call tracer factory from channel args,
// instead of directly fetching it with `GetObject`.
static ServerCallTracerFactory* Get(const ChannelArgs& channel_args);
// Registers a global ServerCallTracerFactory that wil be used by default if
// no corresponding channel arg was found. It is only valid to call this
// before grpc_init(). It is the responsibility of the caller to maintain
// this for the lifetime of the process.
static void RegisterGlobal(ServerCallTracerFactory* factory);
static absl::string_view ChannelArgName();
};
void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder);
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_CHANNEL_CALL_TRACER_H

@ -36,7 +36,14 @@ typedef enum {
/// Value is a \a census_context.
GRPC_CONTEXT_TRACING,
/// Value is a CallTracer object.
/// Value is a CallTracerAnnotationInterface. (ClientCallTracer object on the
/// client-side call, or ServerCallTracer on the server-side.)
GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE,
/// Value is a CallTracerInterface (ServerCallTracer on the server-side,
/// CallAttemptTracer on a subchannel call.)
/// TODO(yashykt): Maybe come up with a better name. This will go away in the
/// future anyway, so not super important.
GRPC_CONTEXT_CALL_TRACER,
/// Reserved for traffic_class_context.

@ -1,97 +0,0 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CORE_LIB_CHANNEL_SERVER_CALL_TRACER_H
#define GRPC_SRC_CORE_LIB_CHANNEL_SERVER_CALL_TRACER_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
// Interface for a tracer that records activities on a server call.
class ServerCallTracer {
public:
virtual ~ServerCallTracer() {}
// Please refer to `grpc_transport_stream_op_batch_payload` for details on
// arguments.
virtual void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) = 0;
virtual void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) = 0;
virtual void RecordSendMessage(const SliceBuffer& send_message) = 0;
// The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()`
// methods should only be invoked when the metadata/message was
// successfully received, i.e., without any error.
virtual void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata, uint32_t flags) = 0;
virtual void RecordReceivedMessage(const SliceBuffer& recv_message) = 0;
// If the call was cancelled before the recv_trailing_metadata op
// was started, recv_trailing_metadata and transport_stream_stats
// will be null.
virtual void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) = 0;
virtual void RecordCancel(grpc_error_handle cancel_error) = 0;
// Should be the last API call to the object. Once invoked, the tracer
// library is free to destroy the object.
virtual void RecordEnd(const gpr_timespec& latency) = 0;
// Records an annotation on the call attempt.
// TODO(yashykt): If needed, extend this to attach attributes with
// annotations.
virtual void RecordAnnotation(absl::string_view annotation) = 0;
};
// Interface for a factory that can create a ServerCallTracer object per server
// call.
class ServerCallTracerFactory {
public:
struct RawPointerChannelArgTag {};
virtual ~ServerCallTracerFactory() {}
virtual ServerCallTracer* CreateNewServerCallTracer() = 0;
// Use this method to get the server call tracer factory from channel args,
// instead of directly fetching it with `GetObject`.
static ServerCallTracerFactory* Get(const ChannelArgs& channel_args);
// Registers a global ServerCallTracerFactory that wil be used by default if
// no corresponding channel arg was found. It is only valid to call this
// before grpc_init(). It is the responsibility of the caller to maintain this
// for the lifetime of the process.
static void RegisterGlobal(ServerCallTracerFactory* factory);
static absl::string_view ChannelArgName();
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_CHANNEL_SERVER_CALL_TRACER_H

@ -0,0 +1,110 @@
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <functional>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/cancel_callback.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
namespace {
// TODO(yashykt): This filter is not really needed. We should be able to move
// this to the connected filter.
class ServerCallTracerFilter : public ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ServerCallTracerFilter> Create(
const ChannelArgs& /*args*/, ChannelFilter::Args /*filter_args*/);
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
};
const grpc_channel_filter ServerCallTracerFilter::kFilter =
MakePromiseBasedFilter<ServerCallTracerFilter, FilterEndpoint::kServer,
kFilterExaminesServerInitialMetadata>(
"server_call_tracer");
absl::StatusOr<ServerCallTracerFilter> ServerCallTracerFilter::Create(
const ChannelArgs& /*args*/, ChannelFilter::Args /*filter_args*/) {
return ServerCallTracerFilter();
}
ArenaPromise<ServerMetadataHandle> ServerCallTracerFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto* call_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<ServerCallTracer*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer == nullptr) {
return next_promise_factory(std::move(call_args));
}
call_tracer->RecordReceivedInitialMetadata(
call_args.client_initial_metadata.get());
call_args.server_initial_metadata->InterceptAndMap(
[call_tracer](ServerMetadataHandle metadata) {
call_tracer->RecordSendInitialMetadata(metadata.get());
return metadata;
});
GetContext<CallFinalization>()->Add(
[call_tracer](const grpc_call_final_info* final_info) {
call_tracer->RecordEnd(final_info);
});
return OnCancel(
Map(next_promise_factory(std::move(call_args)),
[call_tracer](ServerMetadataHandle md) {
call_tracer->RecordSendTrailingMetadata(md.get());
return md;
}),
[call_tracer]() { call_tracer->RecordCancel(absl::CancelledError()); });
}
} // namespace
void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
builder->AppendFilter(&ServerCallTracerFilter::kFilter);
return true;
});
}
} // namespace grpc_core

@ -18,6 +18,7 @@
#include <limits.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/config/core_configuration.h"
@ -29,6 +30,7 @@
namespace grpc_core {
void RegisterBuiltins(CoreConfiguration::Builder* builder) {
RegisterServerCallTracerFilter(builder);
builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter);

@ -61,6 +61,7 @@
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/context.h"
@ -500,7 +501,7 @@ class FilterStackCall final : public Call {
}
struct BatchControl {
FilterStackCall* call_ = nullptr;
CallTracer* call_tracer_ = nullptr;
CallTracerAnnotationInterface* call_tracer_ = nullptr;
grpc_transport_stream_op_batch op_;
// Share memory for cq_completion and notify_tag as they are never needed
// simultaneously. Each byte used in this data structure count as six bytes
@ -534,7 +535,7 @@ class FilterStackCall final : public Call {
// call_ being set to nullptr in PostCompletion method. Store the
// call_tracer_ and call_ variables locally as well because they could be
// modified by another thread after the fetch_sub operation.
CallTracer* call_tracer = call_tracer_;
CallTracerAnnotationInterface* call_tracer = call_tracer_;
FilterStackCall* call = call_;
bool is_call_trace_enabled = grpc_call_trace.enabled();
bool is_call_ops_annotate_enabled =
@ -743,6 +744,26 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
global_stats().IncrementServerCallsCreated();
call->final_op_.server.cancelled = nullptr;
call->final_op_.server.core_server = args->server;
// TODO(yashykt): In the future, we want to also enable stats and trace
// collecting from when the call is created at the transport. The idea is
// that the transport would create the call tracer and pass it in as part of
// the metadata.
auto* server_call_tracer_factory = ServerCallTracerFactory::Get(
args->server != nullptr ? args->server->channel_args() : ChannelArgs());
if (server_call_tracer_factory != nullptr) {
auto* server_call_tracer =
server_call_tracer_factory->CreateNewServerCallTracer(arena);
if (server_call_tracer != nullptr) {
// Note that we are setting both
// GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
// GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future
// promise-based world, we would just a single tracer object for each
// stack (call, subchannel_call, server_call.)
call->ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE,
server_call_tracer, nullptr);
call->ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr);
}
}
}
Call* parent = Call::FromC(args->parent);
@ -1173,8 +1194,8 @@ FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl(
*pslot = bctl;
}
bctl->call_ = this;
bctl->call_tracer_ =
static_cast<CallTracer*>(ContextGet(GRPC_CONTEXT_CALL_TRACER));
bctl->call_tracer_ = static_cast<CallTracerAnnotationInterface*>(
ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE));
bctl->op_.payload = &stream_op_payload_;
return bctl;
}
@ -1445,7 +1466,7 @@ grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
grpc_transport_stream_op_batch_payload* stream_op_payload;
uint32_t seen_ops = 0;
intptr_t pending_ops = 0;
CallTracer* call_tracer = nullptr;
CallTracerAnnotationInterface* call_tracer = nullptr;
for (i = 0; i < nops; i++) {
if (seen_ops & (1u << ops[i].op)) {
@ -1837,7 +1858,8 @@ grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
stream_op->on_complete = &bctl->finish_batch_;
}
call_tracer = static_cast<CallTracer*>(ContextGet(GRPC_CONTEXT_CALL_TRACER));
call_tracer = static_cast<CallTracerAnnotationInterface*>(
ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE));
if ((IsTraceRecordCallopsEnabled() && call_tracer != nullptr)) {
call_tracer->RecordAnnotation(absl::StrFormat(
"BATCH:%p START:%s BATCH:%s (tag:%p)", bctl,
@ -3300,6 +3322,26 @@ ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena,
if (channelz_node != nullptr) {
channelz_node->RecordCallStarted();
}
// TODO(yashykt): In the future, we want to also enable stats and trace
// collecting from when the call is created at the transport. The idea is that
// the transport would create the call tracer and pass it in as part of the
// metadata.
auto* server_call_tracer_factory =
ServerCallTracerFactory::Get(args->server->channel_args());
if (server_call_tracer_factory != nullptr) {
auto* server_call_tracer =
server_call_tracer_factory->CreateNewServerCallTracer(arena);
if (server_call_tracer != nullptr) {
// Note that we are setting both
// GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
// GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future
// promise-based world, we would just a single tracer object for each
// stack (call, subchannel_call, server_call.)
ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE,
server_call_tracer, nullptr);
ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr);
}
}
MutexLock lock(mu());
ScopedContext activity_context(this);
promise_ = channel()->channel_stack()->MakeServerCallPromise(

@ -76,8 +76,8 @@ void BuildCoreConfiguration(CoreConfiguration::Builder* builder) {
grpc_event_engine::experimental::RegisterEventEngineChannelArgPreconditioning(
builder);
// The order of the handshaker registration is crucial here.
// We want TCP connect handshaker to be registered last so that it is added to
// the start of the handshaker list.
// We want TCP connect handshaker to be registered last so that it is added
// to the start of the handshaker list.
RegisterHttpConnectHandshaker(builder);
RegisterTCPConnectHandshaker(builder);
RegisterPriorityLbPolicy(builder);

@ -101,9 +101,11 @@ OpenCensusClientFilter::MakeCallPromise(
call_context, path != nullptr ? path->Ref() : grpc_core::Slice(),
grpc_core::GetContext<grpc_core::Arena>(),
OpenCensusTracingEnabled() && tracing_enabled_);
GPR_DEBUG_ASSERT(call_context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
call_context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
call_context[GRPC_CONTEXT_CALL_TRACER].destroy = nullptr;
GPR_DEBUG_ASSERT(
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value ==
nullptr);
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer;
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy = nullptr;
return next_promise_factory(std::move(call_args));
}

@ -32,16 +32,19 @@
#include <grpcpp/opencensus.h>
#include <grpcpp/server_context.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/cpp/ext/filters/census/client_filter.h"
#include "src/cpp/ext/filters/census/measures.h"
#include "src/cpp/ext/filters/census/server_filter.h"
#include "src/cpp/ext/filters/census/server_call_tracer.h"
namespace grpc {
void RegisterOpenCensusPlugin() {
grpc_core::ServerCallTracerFactory::RegisterGlobal(
new grpc::internal::OpenCensusServerCallTracerFactory);
grpc_core::CoreConfiguration::RegisterBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
@ -51,13 +54,6 @@ void RegisterOpenCensusPlugin() {
&grpc::internal::OpenCensusClientFilter::kFilter);
return true;
});
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, /*priority=*/INT_MAX,
[](grpc_core::ChannelStackBuilder* builder) {
builder->PrependFilter(
&grpc::internal::OpenCensusServerFilter::kFilter);
return true;
});
});
// Access measures to ensure they are initialized. Otherwise, creating a view

@ -56,7 +56,7 @@
namespace grpc {
namespace internal {
class OpenCensusCallTracer : public grpc_core::CallTracer {
class OpenCensusCallTracer : public grpc_core::ClientCallTracer {
public:
class OpenCensusCallAttemptTracer : public CallAttemptTracer {
public:
@ -69,11 +69,14 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(
const grpc_core::SliceBuffer& /*send_message*/) override;
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& /*send_compressed_message*/) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/,
uint32_t /*flags*/) override {}
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(
const grpc_core::SliceBuffer& /*recv_message*/) override;
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {}
void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) override;

@ -18,13 +18,12 @@
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/census/server_filter.h"
#include "src/cpp/ext/filters/census/server_call_tracer.h"
#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <functional>
#include <string>
#include <utility>
#include <vector>
@ -41,18 +40,15 @@
#include <grpcpp/opencensus.h>
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/promise/cancel_callback.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/cpp/ext/filters/census/context.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/measures.h"
@ -91,27 +87,54 @@ void FilterInitialMetadata(grpc_metadata_batch* b,
} // namespace
// An OpenCensusServerCallData class will be created for every grpc call within
// a channel. It is used to store data and methods specific to that call.
// OpenCensusServerCallData is thread-compatible, however typically only 1
// thread should be interacting with a call at a time.
class OpenCensusServerCallData {
// OpenCensusServerCallTracer implementation
class OpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
public:
// Maximum size of server stats that are sent on the wire.
static constexpr uint32_t kMaxServerStatsLen = 16;
explicit OpenCensusServerCallData(
grpc_metadata_batch* client_initial_metadata);
OpenCensusServerCallTracer()
: start_time_(absl::Now()),
recv_message_count_(0),
sent_message_count_(0) {}
void OnSendMessage() { ++sent_message_count_; }
// Please refer to `grpc_transport_stream_op_batch_payload` for details on
// arguments.
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
void OnRecvMessage() { ++recv_message_count_; }
void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) override;
void OnServerTrailingMetadata(grpc_metadata_batch* server_trailing_metadata);
void RecordSendMessage(
const grpc_core::SliceBuffer& /*send_message*/) override {
++sent_message_count_;
}
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& /*send_compressed_message*/) override {}
void OnCancel() { elapsed_time_ = absl::Now() - start_time_; }
void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) override;
void Finalize(const grpc_call_final_info* final_info);
void RecordReceivedMessage(
const grpc_core::SliceBuffer& /*recv_message*/) override {
++recv_message_count_;
}
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {}
void RecordReceivedTrailingMetadata(
grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
void RecordCancel(grpc_error_handle /*cancel_error*/) override {
elapsed_time_ = absl::Now() - start_time_;
}
void RecordEnd(const grpc_call_final_info* final_info) override;
void RecordAnnotation(absl::string_view annotation) override {
context_.AddSpanAnnotation(annotation, {});
}
private:
experimental::CensusContext context_;
@ -128,13 +151,10 @@ class OpenCensusServerCallData {
char stats_buf_[kMaxServerStatsLen];
};
constexpr uint32_t OpenCensusServerCallData::kMaxServerStatsLen;
OpenCensusServerCallData::OpenCensusServerCallData(
grpc_metadata_batch* client_initial_metadata)
: start_time_(absl::Now()), recv_message_count_(0), sent_message_count_(0) {
void OpenCensusServerCallTracer::RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) {
ServerMetadataElements sml;
FilterInitialMetadata(client_initial_metadata, &sml);
FilterInitialMetadata(recv_initial_metadata, &sml);
path_ = std::move(sml.path);
method_ = GetMethod(path_);
auto tracing_enabled = OpenCensusTracingEnabled();
@ -153,7 +173,23 @@ OpenCensusServerCallData::OpenCensusServerCallData(
}
}
void OpenCensusServerCallData::Finalize(
void OpenCensusServerCallTracer::RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) {
// We need to record the time when the trailing metadata was sent to
// mark the completeness of the request.
elapsed_time_ = absl::Now() - start_time_;
if (OpenCensusStatsEnabled() && send_trailing_metadata != nullptr) {
size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
stats_buf_, kMaxServerStatsLen);
if (len > 0) {
send_trailing_metadata->Set(
grpc_core::GrpcServerStatsBinMetadata(),
grpc_core::Slice::FromCopiedBuffer(stats_buf_, len));
}
}
}
void OpenCensusServerCallTracer::RecordEnd(
const grpc_call_final_info* final_info) {
if (OpenCensusStatsEnabled()) {
const uint64_t request_size = GetOutgoingDataSize(final_info);
@ -178,66 +214,14 @@ void OpenCensusServerCallData::Finalize(
}
}
void OpenCensusServerCallData::OnServerTrailingMetadata(
grpc_metadata_batch* server_trailing_metadata) {
// We need to record the time when the trailing metadata was sent to
// mark the completeness of the request.
elapsed_time_ = absl::Now() - start_time_;
if (OpenCensusStatsEnabled() && server_trailing_metadata != nullptr) {
size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
stats_buf_, kMaxServerStatsLen);
if (len > 0) {
server_trailing_metadata->Set(
grpc_core::GrpcServerStatsBinMetadata(),
grpc_core::Slice::FromCopiedBuffer(stats_buf_, len));
}
}
}
//
// OpenCensusServerFilter
// OpenCensusServerCallTracerFactory
//
const grpc_channel_filter OpenCensusServerFilter::kFilter =
grpc_core::MakePromiseBasedFilter<
OpenCensusServerFilter, grpc_core::FilterEndpoint::kServer,
grpc_core::kFilterExaminesServerInitialMetadata |
grpc_core::kFilterExaminesInboundMessages |
grpc_core::kFilterExaminesOutboundMessages>("opencensus_server");
absl::StatusOr<OpenCensusServerFilter> OpenCensusServerFilter::Create(
const grpc_core::ChannelArgs& /*args*/,
grpc_core::ChannelFilter::Args /*filter_args*/) {
return OpenCensusServerFilter();
}
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
OpenCensusServerFilter::MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) {
auto* calld = grpc_core::GetContext<grpc_core::Arena>()
->ManagedNew<OpenCensusServerCallData>(
call_args.client_initial_metadata.get());
call_args.client_to_server_messages->InterceptAndMap(
[calld](grpc_core::MessageHandle message) {
calld->OnRecvMessage();
return message;
});
call_args.server_to_client_messages->InterceptAndMap(
[calld](grpc_core::MessageHandle message) {
calld->OnSendMessage();
return message;
});
grpc_core::GetContext<grpc_core::CallFinalization>()->Add(
[calld](const grpc_call_final_info* final_info) {
calld->Finalize(final_info);
});
return grpc_core::OnCancel(Map(next_promise_factory(std::move(call_args)),
[calld](grpc_core::ServerMetadataHandle md) {
calld->OnServerTrailingMetadata(md.get());
return md;
}),
[calld]() { calld->OnCancel(); });
grpc_core::ServerCallTracer*
OpenCensusServerCallTracerFactory::CreateNewServerCallTracer(
grpc_core::Arena* arena) {
return arena->ManagedNew<OpenCensusServerCallTracer>();
}
} // namespace internal

@ -0,0 +1,40 @@
//
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_CALL_TRACER_H
#define GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_CALL_TRACER_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc {
namespace internal {
class OpenCensusServerCallTracerFactory
: public grpc_core::ServerCallTracerFactory {
public:
grpc_core::ServerCallTracer* CreateNewServerCallTracer(
grpc_core::Arena* arena) override;
};
} // namespace internal
} // namespace grpc
#endif // GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_CALL_TRACER_H

@ -1,54 +0,0 @@
//
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H
#define GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H
#include <grpc/support/port_platform.h>
#include "absl/status/statusor.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/transport/transport.h"
namespace grpc {
namespace internal {
class OpenCensusServerFilter : public grpc_core::ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<OpenCensusServerFilter> Create(
const grpc_core::ChannelArgs& /*args*/,
grpc_core::ChannelFilter::Args /*filter_args*/);
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) override;
private:
OpenCensusServerFilter() = default;
};
} // namespace internal
} // namespace grpc
#endif // GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H

@ -460,6 +460,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/address_utils/parse_address.cc',
'src/core/lib/address_utils/sockaddr_utils.cc',
'src/core/lib/backoff/backoff.cc',
'src/core/lib/channel/call_tracer.cc',
'src/core/lib/channel/channel_args.cc',
'src/core/lib/channel/channel_args_preconditioning.cc',
'src/core/lib/channel/channel_stack.cc',
@ -470,7 +471,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/channel/channelz_registry.cc',
'src/core/lib/channel/connected_channel.cc',
'src/core/lib/channel/promise_based_filter.cc',
'src/core/lib/channel/server_call_tracer.cc',
'src/core/lib/channel/server_call_tracer_filter.cc',
'src/core/lib/channel/status_util.cc',
'src/core/lib/compression/compression.cc',
'src/core/lib/compression/compression_internal.cc',

@ -91,8 +91,9 @@ TEST(ChannelStackFilters, LooksAsExpected) {
std::vector<std::string>({"authority", "connected"}));
EXPECT_EQ(MakeStack("unknown", minimal_stack_args, GRPC_CLIENT_SUBCHANNEL),
std::vector<std::string>({"authority", "connected"}));
EXPECT_EQ(MakeStack("unknown", minimal_stack_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>({"server", "connected"}));
EXPECT_EQ(
MakeStack("unknown", minimal_stack_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>({"server", "server_call_tracer", "connected"}));
EXPECT_EQ(MakeStack("chttp2", minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL),
std::vector<std::string>(
@ -101,8 +102,8 @@ TEST(ChannelStackFilters, LooksAsExpected) {
std::vector<std::string>(
{"authority", "http-client", "compression", "connected"}));
EXPECT_EQ(MakeStack("chttp2", minimal_stack_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>(
{"server", "http-server", "compression", "connected"}));
std::vector<std::string>({"server", "http-server", "compression",
"server_call_tracer", "connected"}));
EXPECT_EQ(MakeStack(nullptr, minimal_stack_args, GRPC_CLIENT_CHANNEL),
std::vector<std::string>({"client-channel"}));
@ -115,8 +116,8 @@ TEST(ChannelStackFilters, LooksAsExpected) {
MakeStack("unknown", no_args, GRPC_CLIENT_SUBCHANNEL),
std::vector<std::string>({"authority", "message_size", "connected"}));
EXPECT_EQ(MakeStack("unknown", no_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>(
{"server", "message_size", "deadline", "connected"}));
std::vector<std::string>({"server", "message_size", "deadline",
"server_call_tracer", "connected"}));
EXPECT_EQ(
MakeStack("chttp2", no_args, GRPC_CLIENT_DIRECT_CHANNEL),
@ -127,10 +128,10 @@ TEST(ChannelStackFilters, LooksAsExpected) {
std::vector<std::string>({"authority", "message_size", "http-client",
"compression", "connected"}));
EXPECT_EQ(
MakeStack("chttp2", no_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>({"server", "message_size", "deadline",
"http-server", "compression", "connected"}));
EXPECT_EQ(MakeStack("chttp2", no_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>({"server", "message_size", "deadline",
"http-server", "compression",
"server_call_tracer", "connected"}));
EXPECT_EQ(MakeStack(nullptr, no_args, GRPC_CLIENT_CHANNEL),
std::vector<std::string>({"client-channel"}));
}

@ -14,16 +14,17 @@
#include "gtest/gtest.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/server_call_tracer.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc_core {
namespace {
class TestServerCallTracerFactory : public ServerCallTracerFactory {
public:
ServerCallTracer* CreateNewServerCallTracer() override {
ServerCallTracer* CreateNewServerCallTracer(Arena* /*arena*/) override {
Crash("Not implemented");
}
};

@ -26,7 +26,6 @@
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include <grpcpp/opencensus.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
@ -111,44 +110,6 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertion) {
grpc_shutdown();
}
// Test filters insertion with OpenCensus plugin registered
TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertionAfterCensus) {
CoreConfiguration::Reset();
grpc::RegisterOpenCensusPlugin();
grpc_init();
// Add 2 test filters to XdsChannelStackModifier
const grpc_channel_filter test_filter_1 = {
nullptr, nullptr, nullptr, 0, nullptr, nullptr, nullptr,
0, nullptr, nullptr, nullptr, nullptr, kTestFilter1};
const grpc_channel_filter test_filter_2 = {
nullptr, nullptr, nullptr, 0, nullptr, nullptr, nullptr,
0, nullptr, nullptr, nullptr, nullptr, kTestFilter2};
auto channel_stack_modifier = MakeRefCounted<XdsChannelStackModifier>(
std::vector<const grpc_channel_filter*>{&test_filter_1, &test_filter_2});
grpc_arg arg = channel_stack_modifier->MakeChannelArg();
// Create a phony ChannelStackBuilder object
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL,
ChannelArgs::FromC(args));
grpc_channel_args_destroy(args);
grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));
fake_transport_vtable.name = "fake";
grpc_transport fake_transport = {&fake_transport_vtable};
builder.SetTransport(&fake_transport);
// Construct channel stack and verify that the test filters were successfully
// added after the census filter
ASSERT_TRUE(CoreConfiguration::Get().channel_init().CreateStack(&builder));
std::vector<std::string> filters;
for (const auto& entry : *builder.mutable_stack()) {
filters.push_back(entry->name);
}
filters.resize(4);
EXPECT_EQ(filters, std::vector<std::string>({"server", "opencensus_server",
kTestFilter1, kTestFilter2}));
grpc_shutdown();
}
} // namespace
} // namespace testing
} // namespace grpc_core

@ -1990,6 +1990,7 @@ src/core/lib/avl/avl.h \
src/core/lib/backoff/backoff.cc \
src/core/lib/backoff/backoff.h \
src/core/lib/channel/call_finalization.h \
src/core/lib/channel/call_tracer.cc \
src/core/lib/channel/call_tracer.h \
src/core/lib/channel/channel_args.cc \
src/core/lib/channel/channel_args.h \
@ -2013,8 +2014,7 @@ src/core/lib/channel/connected_channel.h \
src/core/lib/channel/context.h \
src/core/lib/channel/promise_based_filter.cc \
src/core/lib/channel/promise_based_filter.h \
src/core/lib/channel/server_call_tracer.cc \
src/core/lib/channel/server_call_tracer.h \
src/core/lib/channel/server_call_tracer_filter.cc \
src/core/lib/channel/status_util.cc \
src/core/lib/channel/status_util.h \
src/core/lib/compression/compression.cc \
@ -2390,6 +2390,7 @@ src/core/lib/matchers/matchers.h \
src/core/lib/promise/activity.cc \
src/core/lib/promise/activity.h \
src/core/lib/promise/arena_promise.h \
src/core/lib/promise/cancel_callback.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_join.h \
src/core/lib/promise/detail/basic_seq.h \

@ -1768,6 +1768,7 @@ src/core/lib/backoff/backoff.cc \
src/core/lib/backoff/backoff.h \
src/core/lib/channel/README.md \
src/core/lib/channel/call_finalization.h \
src/core/lib/channel/call_tracer.cc \
src/core/lib/channel/call_tracer.h \
src/core/lib/channel/channel_args.cc \
src/core/lib/channel/channel_args.h \
@ -1791,8 +1792,7 @@ src/core/lib/channel/connected_channel.h \
src/core/lib/channel/context.h \
src/core/lib/channel/promise_based_filter.cc \
src/core/lib/channel/promise_based_filter.h \
src/core/lib/channel/server_call_tracer.cc \
src/core/lib/channel/server_call_tracer.h \
src/core/lib/channel/server_call_tracer_filter.cc \
src/core/lib/channel/status_util.cc \
src/core/lib/channel/status_util.h \
src/core/lib/compression/compression.cc \
@ -2171,6 +2171,7 @@ src/core/lib/matchers/matchers.h \
src/core/lib/promise/activity.cc \
src/core/lib/promise/activity.h \
src/core/lib/promise/arena_promise.h \
src/core/lib/promise/cancel_callback.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_join.h \
src/core/lib/promise/detail/basic_seq.h \

Loading…
Cancel
Save