diff --git a/BUILD b/BUILD index 33d14f660d6..7ada5282efb 100644 --- a/BUILD +++ b/BUILD @@ -1216,8 +1216,9 @@ grpc_cc_library( "//src/core:lib/channel/channelz.cc", "//src/core:lib/channel/channelz_registry.cc", "//src/core:lib/channel/connected_channel.cc", + "//src/core:lib/channel/server_call_tracer_filter.cc", "//src/core:lib/channel/promise_based_filter.cc", - "//src/core:lib/channel/server_call_tracer.cc", + "//src/core:lib/channel/call_tracer.cc", "//src/core:lib/channel/status_util.cc", "//src/core:lib/compression/compression.cc", "//src/core:lib/compression/compression_internal.cc", @@ -1328,7 +1329,6 @@ grpc_cc_library( "//src/core:lib/channel/connected_channel.h", "//src/core:lib/channel/context.h", "//src/core:lib/channel/promise_based_filter.h", - "//src/core:lib/channel/server_call_tracer.h", "//src/core:lib/channel/status_util.h", "//src/core:lib/compression/compression_internal.h", "//src/core:lib/compression/message_compress.h", @@ -1463,6 +1463,7 @@ grpc_cc_library( "//src/core:basic_join", "//src/core:basic_seq", "//src/core:bitset", + "//src/core:cancel_callback", "//src/core:channel_args", "//src/core:channel_args_endpoint_config", "//src/core:channel_args_preconditioning", @@ -2190,7 +2191,7 @@ grpc_cc_library( "src/cpp/ext/filters/census/grpc_plugin.cc", "src/cpp/ext/filters/census/measures.cc", "src/cpp/ext/filters/census/rpc_encoding.cc", - "src/cpp/ext/filters/census/server_filter.cc", + "src/cpp/ext/filters/census/server_call_tracer.cc", "src/cpp/ext/filters/census/views.cc", ], hdrs = [ @@ -2201,7 +2202,7 @@ grpc_cc_library( "src/cpp/ext/filters/census/measures.h", "src/cpp/ext/filters/census/open_census_call_tracer.h", "src/cpp/ext/filters/census/rpc_encoding.h", - "src/cpp/ext/filters/census/server_filter.h", + "src/cpp/ext/filters/census/server_call_tracer.h", ], external_deps = [ "absl/base", @@ -2231,16 +2232,12 @@ grpc_cc_library( "grpc_public_hdrs", "//src/core:arena", "//src/core:arena_promise", - "//src/core:cancel_callback", "//src/core:channel_args", "//src/core:channel_fwd", "//src/core:channel_stack_type", "//src/core:context", "//src/core:error", "//src/core:experiments", - "//src/core:map", - "//src/core:pipe", - "//src/core:poll", "//src/core:slice", "//src/core:slice_buffer", "//src/core:slice_refcount", diff --git a/CMakeLists.txt b/CMakeLists.txt index d9cab119d93..eb57fd35f67 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2155,6 +2155,7 @@ add_library(grpc src/core/lib/address_utils/parse_address.cc src/core/lib/address_utils/sockaddr_utils.cc src/core/lib/backoff/backoff.cc + src/core/lib/channel/call_tracer.cc src/core/lib/channel/channel_args.cc src/core/lib/channel/channel_args_preconditioning.cc src/core/lib/channel/channel_stack.cc @@ -2165,7 +2166,7 @@ add_library(grpc src/core/lib/channel/channelz_registry.cc src/core/lib/channel/connected_channel.cc src/core/lib/channel/promise_based_filter.cc - src/core/lib/channel/server_call_tracer.cc + src/core/lib/channel/server_call_tracer_filter.cc src/core/lib/channel/status_util.cc src/core/lib/compression/compression.cc src/core/lib/compression/compression_internal.cc @@ -2844,6 +2845,7 @@ add_library(grpc_unsecure src/core/lib/address_utils/parse_address.cc src/core/lib/address_utils/sockaddr_utils.cc src/core/lib/backoff/backoff.cc + src/core/lib/channel/call_tracer.cc src/core/lib/channel/channel_args.cc src/core/lib/channel/channel_args_preconditioning.cc src/core/lib/channel/channel_stack.cc @@ -2854,7 +2856,7 @@ add_library(grpc_unsecure src/core/lib/channel/channelz_registry.cc src/core/lib/channel/connected_channel.cc src/core/lib/channel/promise_based_filter.cc - src/core/lib/channel/server_call_tracer.cc + src/core/lib/channel/server_call_tracer_filter.cc src/core/lib/channel/status_util.cc src/core/lib/compression/compression.cc src/core/lib/compression/compression_internal.cc @@ -4363,6 +4365,7 @@ add_library(grpc_authorization_provider src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c src/core/lib/address_utils/parse_address.cc src/core/lib/address_utils/sockaddr_utils.cc + src/core/lib/channel/call_tracer.cc src/core/lib/channel/channel_args.cc src/core/lib/channel/channel_args_preconditioning.cc src/core/lib/channel/channel_stack.cc @@ -4373,7 +4376,7 @@ add_library(grpc_authorization_provider src/core/lib/channel/channelz_registry.cc src/core/lib/channel/connected_channel.cc src/core/lib/channel/promise_based_filter.cc - src/core/lib/channel/server_call_tracer.cc + src/core/lib/channel/server_call_tracer_filter.cc src/core/lib/channel/status_util.cc src/core/lib/compression/compression.cc src/core/lib/compression/compression_internal.cc @@ -11424,6 +11427,7 @@ add_executable(frame_test src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c src/core/lib/address_utils/parse_address.cc src/core/lib/address_utils/sockaddr_utils.cc + src/core/lib/channel/call_tracer.cc src/core/lib/channel/channel_args.cc src/core/lib/channel/channel_args_preconditioning.cc src/core/lib/channel/channel_stack.cc @@ -11434,7 +11438,7 @@ add_executable(frame_test src/core/lib/channel/channelz_registry.cc src/core/lib/channel/connected_channel.cc src/core/lib/channel/promise_based_filter.cc - src/core/lib/channel/server_call_tracer.cc + src/core/lib/channel/server_call_tracer_filter.cc src/core/lib/channel/status_util.cc src/core/lib/compression/compression.cc src/core/lib/compression/compression_internal.cc diff --git a/Makefile b/Makefile index 63f54ee9d68..f5710676a33 100644 --- a/Makefile +++ b/Makefile @@ -1404,6 +1404,7 @@ LIBGRPC_SRC = \ src/core/lib/address_utils/parse_address.cc \ src/core/lib/address_utils/sockaddr_utils.cc \ src/core/lib/backoff/backoff.cc \ + src/core/lib/channel/call_tracer.cc \ src/core/lib/channel/channel_args.cc \ src/core/lib/channel/channel_args_preconditioning.cc \ src/core/lib/channel/channel_stack.cc \ @@ -1414,7 +1415,7 @@ LIBGRPC_SRC = \ src/core/lib/channel/channelz_registry.cc \ src/core/lib/channel/connected_channel.cc \ src/core/lib/channel/promise_based_filter.cc \ - src/core/lib/channel/server_call_tracer.cc \ + src/core/lib/channel/server_call_tracer_filter.cc \ src/core/lib/channel/status_util.cc \ src/core/lib/compression/compression.cc \ src/core/lib/compression/compression_internal.cc \ @@ -1946,6 +1947,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/address_utils/parse_address.cc \ src/core/lib/address_utils/sockaddr_utils.cc \ src/core/lib/backoff/backoff.cc \ + src/core/lib/channel/call_tracer.cc \ src/core/lib/channel/channel_args.cc \ src/core/lib/channel/channel_args_preconditioning.cc \ src/core/lib/channel/channel_stack.cc \ @@ -1956,7 +1958,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/channel/channelz_registry.cc \ src/core/lib/channel/connected_channel.cc \ src/core/lib/channel/promise_based_filter.cc \ - src/core/lib/channel/server_call_tracer.cc \ + src/core/lib/channel/server_call_tracer_filter.cc \ src/core/lib/channel/status_util.cc \ src/core/lib/compression/compression.cc \ src/core/lib/compression/compression_internal.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 7d93f037654..7f06ab59874 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -781,7 +781,6 @@ libs: - src/core/lib/channel/connected_channel.h - src/core/lib/channel/context.h - src/core/lib/channel/promise_based_filter.h - - src/core/lib/channel/server_call_tracer.h - src/core/lib/channel/status_util.h - src/core/lib/compression/compression_internal.h - src/core/lib/compression/message_compress.h @@ -942,6 +941,7 @@ libs: - src/core/lib/matchers/matchers.h - src/core/lib/promise/activity.h - src/core/lib/promise/arena_promise.h + - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_join.h - src/core/lib/promise/detail/basic_seq.h @@ -1553,6 +1553,7 @@ libs: - src/core/lib/address_utils/parse_address.cc - src/core/lib/address_utils/sockaddr_utils.cc - src/core/lib/backoff/backoff.cc + - src/core/lib/channel/call_tracer.cc - src/core/lib/channel/channel_args.cc - src/core/lib/channel/channel_args_preconditioning.cc - src/core/lib/channel/channel_stack.cc @@ -1563,7 +1564,7 @@ libs: - src/core/lib/channel/channelz_registry.cc - src/core/lib/channel/connected_channel.cc - src/core/lib/channel/promise_based_filter.cc - - src/core/lib/channel/server_call_tracer.cc + - src/core/lib/channel/server_call_tracer_filter.cc - src/core/lib/channel/status_util.cc - src/core/lib/compression/compression.cc - src/core/lib/compression/compression_internal.cc @@ -2123,7 +2124,6 @@ libs: - src/core/lib/channel/connected_channel.h - src/core/lib/channel/context.h - src/core/lib/channel/promise_based_filter.h - - src/core/lib/channel/server_call_tracer.h - src/core/lib/channel/status_util.h - src/core/lib/compression/compression_internal.h - src/core/lib/compression/message_compress.h @@ -2281,6 +2281,7 @@ libs: - src/core/lib/load_balancing/subchannel_interface.h - src/core/lib/promise/activity.h - src/core/lib/promise/arena_promise.h + - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_join.h - src/core/lib/promise/detail/basic_seq.h @@ -2508,6 +2509,7 @@ libs: - src/core/lib/address_utils/parse_address.cc - src/core/lib/address_utils/sockaddr_utils.cc - src/core/lib/backoff/backoff.cc + - src/core/lib/channel/call_tracer.cc - src/core/lib/channel/channel_args.cc - src/core/lib/channel/channel_args_preconditioning.cc - src/core/lib/channel/channel_stack.cc @@ -2518,7 +2520,7 @@ libs: - src/core/lib/channel/channelz_registry.cc - src/core/lib/channel/connected_channel.cc - src/core/lib/channel/promise_based_filter.cc - - src/core/lib/channel/server_call_tracer.cc + - src/core/lib/channel/server_call_tracer_filter.cc - src/core/lib/channel/status_util.cc - src/core/lib/compression/compression.cc - src/core/lib/compression/compression_internal.cc @@ -3586,7 +3588,6 @@ libs: - src/core/lib/channel/connected_channel.h - src/core/lib/channel/context.h - src/core/lib/channel/promise_based_filter.h - - src/core/lib/channel/server_call_tracer.h - src/core/lib/channel/status_util.h - src/core/lib/compression/compression_internal.h - src/core/lib/compression/message_compress.h @@ -3738,6 +3739,7 @@ libs: - src/core/lib/matchers/matchers.h - src/core/lib/promise/activity.h - src/core/lib/promise/arena_promise.h + - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_join.h - src/core/lib/promise/detail/basic_seq.h @@ -3850,6 +3852,7 @@ libs: - src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c - src/core/lib/address_utils/parse_address.cc - src/core/lib/address_utils/sockaddr_utils.cc + - src/core/lib/channel/call_tracer.cc - src/core/lib/channel/channel_args.cc - src/core/lib/channel/channel_args_preconditioning.cc - src/core/lib/channel/channel_stack.cc @@ -3860,7 +3863,7 @@ libs: - src/core/lib/channel/channelz_registry.cc - src/core/lib/channel/connected_channel.cc - src/core/lib/channel/promise_based_filter.cc - - src/core/lib/channel/server_call_tracer.cc + - src/core/lib/channel/server_call_tracer_filter.cc - src/core/lib/channel/status_util.cc - src/core/lib/compression/compression.cc - src/core/lib/compression/compression_internal.cc @@ -7395,7 +7398,6 @@ targets: - src/core/lib/channel/connected_channel.h - src/core/lib/channel/context.h - src/core/lib/channel/promise_based_filter.h - - src/core/lib/channel/server_call_tracer.h - src/core/lib/channel/status_util.h - src/core/lib/compression/compression_internal.h - src/core/lib/compression/message_compress.h @@ -7546,6 +7548,7 @@ targets: - src/core/lib/load_balancing/subchannel_interface.h - src/core/lib/promise/activity.h - src/core/lib/promise/arena_promise.h + - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_join.h - src/core/lib/promise/detail/basic_seq.h @@ -7642,6 +7645,7 @@ targets: - src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c - src/core/lib/address_utils/parse_address.cc - src/core/lib/address_utils/sockaddr_utils.cc + - src/core/lib/channel/call_tracer.cc - src/core/lib/channel/channel_args.cc - src/core/lib/channel/channel_args_preconditioning.cc - src/core/lib/channel/channel_stack.cc @@ -7652,7 +7656,7 @@ targets: - src/core/lib/channel/channelz_registry.cc - src/core/lib/channel/connected_channel.cc - src/core/lib/channel/promise_based_filter.cc - - src/core/lib/channel/server_call_tracer.cc + - src/core/lib/channel/server_call_tracer_filter.cc - src/core/lib/channel/status_util.cc - src/core/lib/compression/compression.cc - src/core/lib/compression/compression_internal.cc diff --git a/config.m4 b/config.m4 index fbd3040c77a..92612e073b6 100644 --- a/config.m4 +++ b/config.m4 @@ -485,6 +485,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/address_utils/parse_address.cc \ src/core/lib/address_utils/sockaddr_utils.cc \ src/core/lib/backoff/backoff.cc \ + src/core/lib/channel/call_tracer.cc \ src/core/lib/channel/channel_args.cc \ src/core/lib/channel/channel_args_preconditioning.cc \ src/core/lib/channel/channel_stack.cc \ @@ -495,7 +496,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/channel/channelz_registry.cc \ src/core/lib/channel/connected_channel.cc \ src/core/lib/channel/promise_based_filter.cc \ - src/core/lib/channel/server_call_tracer.cc \ + src/core/lib/channel/server_call_tracer_filter.cc \ src/core/lib/channel/status_util.cc \ src/core/lib/compression/compression.cc \ src/core/lib/compression/compression_internal.cc \ diff --git a/config.w32 b/config.w32 index 281a7095214..6da67e3c94c 100644 --- a/config.w32 +++ b/config.w32 @@ -451,6 +451,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\address_utils\\parse_address.cc " + "src\\core\\lib\\address_utils\\sockaddr_utils.cc " + "src\\core\\lib\\backoff\\backoff.cc " + + "src\\core\\lib\\channel\\call_tracer.cc " + "src\\core\\lib\\channel\\channel_args.cc " + "src\\core\\lib\\channel\\channel_args_preconditioning.cc " + "src\\core\\lib\\channel\\channel_stack.cc " + @@ -461,7 +462,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\channel\\channelz_registry.cc " + "src\\core\\lib\\channel\\connected_channel.cc " + "src\\core\\lib\\channel\\promise_based_filter.cc " + - "src\\core\\lib\\channel\\server_call_tracer.cc " + + "src\\core\\lib\\channel\\server_call_tracer_filter.cc " + "src\\core\\lib\\channel\\status_util.cc " + "src\\core\\lib\\compression\\compression.cc " + "src\\core\\lib\\compression\\compression_internal.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 950eb660358..5656c0bcc96 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -725,7 +725,6 @@ Pod::Spec.new do |s| 'src/core/lib/channel/connected_channel.h', 'src/core/lib/channel/context.h', 'src/core/lib/channel/promise_based_filter.h', - 'src/core/lib/channel/server_call_tracer.h', 'src/core/lib/channel/status_util.h', 'src/core/lib/compression/compression_internal.h', 'src/core/lib/compression/message_compress.h', @@ -912,6 +911,7 @@ Pod::Spec.new do |s| 'src/core/lib/matchers/matchers.h', 'src/core/lib/promise/activity.h', 'src/core/lib/promise/arena_promise.h', + 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_join.h', 'src/core/lib/promise/detail/basic_seq.h', @@ -1660,7 +1660,6 @@ Pod::Spec.new do |s| 'src/core/lib/channel/connected_channel.h', 'src/core/lib/channel/context.h', 'src/core/lib/channel/promise_based_filter.h', - 'src/core/lib/channel/server_call_tracer.h', 'src/core/lib/channel/status_util.h', 'src/core/lib/compression/compression_internal.h', 'src/core/lib/compression/message_compress.h', @@ -1847,6 +1846,7 @@ Pod::Spec.new do |s| 'src/core/lib/matchers/matchers.h', 'src/core/lib/promise/activity.h', 'src/core/lib/promise/arena_promise.h', + 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_join.h', 'src/core/lib/promise/detail/basic_seq.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index a32a3f5feec..5f1b1963a26 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1086,6 +1086,7 @@ Pod::Spec.new do |s| 'src/core/lib/backoff/backoff.cc', 'src/core/lib/backoff/backoff.h', 'src/core/lib/channel/call_finalization.h', + 'src/core/lib/channel/call_tracer.cc', 'src/core/lib/channel/call_tracer.h', 'src/core/lib/channel/channel_args.cc', 'src/core/lib/channel/channel_args.h', @@ -1109,8 +1110,7 @@ Pod::Spec.new do |s| 'src/core/lib/channel/context.h', 'src/core/lib/channel/promise_based_filter.cc', 'src/core/lib/channel/promise_based_filter.h', - 'src/core/lib/channel/server_call_tracer.cc', - 'src/core/lib/channel/server_call_tracer.h', + 'src/core/lib/channel/server_call_tracer_filter.cc', 'src/core/lib/channel/status_util.cc', 'src/core/lib/channel/status_util.h', 'src/core/lib/compression/compression.cc', @@ -1486,6 +1486,7 @@ Pod::Spec.new do |s| 'src/core/lib/promise/activity.cc', 'src/core/lib/promise/activity.h', 'src/core/lib/promise/arena_promise.h', + 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_join.h', 'src/core/lib/promise/detail/basic_seq.h', @@ -2348,7 +2349,6 @@ Pod::Spec.new do |s| 'src/core/lib/channel/connected_channel.h', 'src/core/lib/channel/context.h', 'src/core/lib/channel/promise_based_filter.h', - 'src/core/lib/channel/server_call_tracer.h', 'src/core/lib/channel/status_util.h', 'src/core/lib/compression/compression_internal.h', 'src/core/lib/compression/message_compress.h', @@ -2535,6 +2535,7 @@ Pod::Spec.new do |s| 'src/core/lib/matchers/matchers.h', 'src/core/lib/promise/activity.h', 'src/core/lib/promise/arena_promise.h', + 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_join.h', 'src/core/lib/promise/detail/basic_seq.h', diff --git a/grpc.gemspec b/grpc.gemspec index 7c042a5ea8b..fdbd9757882 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -995,6 +995,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/backoff/backoff.cc ) s.files += %w( src/core/lib/backoff/backoff.h ) s.files += %w( src/core/lib/channel/call_finalization.h ) + s.files += %w( src/core/lib/channel/call_tracer.cc ) s.files += %w( src/core/lib/channel/call_tracer.h ) s.files += %w( src/core/lib/channel/channel_args.cc ) s.files += %w( src/core/lib/channel/channel_args.h ) @@ -1018,8 +1019,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/channel/context.h ) s.files += %w( src/core/lib/channel/promise_based_filter.cc ) s.files += %w( src/core/lib/channel/promise_based_filter.h ) - s.files += %w( src/core/lib/channel/server_call_tracer.cc ) - s.files += %w( src/core/lib/channel/server_call_tracer.h ) + s.files += %w( src/core/lib/channel/server_call_tracer_filter.cc ) s.files += %w( src/core/lib/channel/status_util.cc ) s.files += %w( src/core/lib/channel/status_util.h ) s.files += %w( src/core/lib/compression/compression.cc ) @@ -1395,6 +1395,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/promise/activity.cc ) s.files += %w( src/core/lib/promise/activity.h ) s.files += %w( src/core/lib/promise/arena_promise.h ) + s.files += %w( src/core/lib/promise/cancel_callback.h ) s.files += %w( src/core/lib/promise/context.h ) s.files += %w( src/core/lib/promise/detail/basic_join.h ) s.files += %w( src/core/lib/promise/detail/basic_seq.h ) diff --git a/grpc.gyp b/grpc.gyp index 5b39a3e371a..cba8fd1e7aa 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -817,6 +817,7 @@ 'src/core/lib/address_utils/parse_address.cc', 'src/core/lib/address_utils/sockaddr_utils.cc', 'src/core/lib/backoff/backoff.cc', + 'src/core/lib/channel/call_tracer.cc', 'src/core/lib/channel/channel_args.cc', 'src/core/lib/channel/channel_args_preconditioning.cc', 'src/core/lib/channel/channel_stack.cc', @@ -827,7 +828,7 @@ 'src/core/lib/channel/channelz_registry.cc', 'src/core/lib/channel/connected_channel.cc', 'src/core/lib/channel/promise_based_filter.cc', - 'src/core/lib/channel/server_call_tracer.cc', + 'src/core/lib/channel/server_call_tracer_filter.cc', 'src/core/lib/channel/status_util.cc', 'src/core/lib/compression/compression.cc', 'src/core/lib/compression/compression_internal.cc', @@ -1301,6 +1302,7 @@ 'src/core/lib/address_utils/parse_address.cc', 'src/core/lib/address_utils/sockaddr_utils.cc', 'src/core/lib/backoff/backoff.cc', + 'src/core/lib/channel/call_tracer.cc', 'src/core/lib/channel/channel_args.cc', 'src/core/lib/channel/channel_args_preconditioning.cc', 'src/core/lib/channel/channel_stack.cc', @@ -1311,7 +1313,7 @@ 'src/core/lib/channel/channelz_registry.cc', 'src/core/lib/channel/connected_channel.cc', 'src/core/lib/channel/promise_based_filter.cc', - 'src/core/lib/channel/server_call_tracer.cc', + 'src/core/lib/channel/server_call_tracer_filter.cc', 'src/core/lib/channel/status_util.cc', 'src/core/lib/compression/compression.cc', 'src/core/lib/compression/compression_internal.cc', @@ -1809,6 +1811,7 @@ 'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c', 'src/core/lib/address_utils/parse_address.cc', 'src/core/lib/address_utils/sockaddr_utils.cc', + 'src/core/lib/channel/call_tracer.cc', 'src/core/lib/channel/channel_args.cc', 'src/core/lib/channel/channel_args_preconditioning.cc', 'src/core/lib/channel/channel_stack.cc', @@ -1819,7 +1822,7 @@ 'src/core/lib/channel/channelz_registry.cc', 'src/core/lib/channel/connected_channel.cc', 'src/core/lib/channel/promise_based_filter.cc', - 'src/core/lib/channel/server_call_tracer.cc', + 'src/core/lib/channel/server_call_tracer_filter.cc', 'src/core/lib/channel/status_util.cc', 'src/core/lib/compression/compression.cc', 'src/core/lib/compression/compression_internal.cc', diff --git a/package.xml b/package.xml index d38138a6b60..bf49b7242cc 100644 --- a/package.xml +++ b/package.xml @@ -977,6 +977,7 @@ + @@ -1000,8 +1001,7 @@ - - + @@ -1377,6 +1377,7 @@ + diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 2c98872a3fb..9d1f2574f2d 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1907,8 +1907,8 @@ absl::optional ClientChannel::CallData::CheckResolution( } // If the call was queued, add trace annotation. if (was_queued) { - auto* call_tracer = static_cast( - call_context()[GRPC_CONTEXT_CALL_TRACER].value); + auto* call_tracer = static_cast( + call_context()[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value); if (call_tracer != nullptr) { call_tracer->RecordAnnotation("Delayed name resolution complete."); } @@ -2504,12 +2504,14 @@ class ClientChannel::LoadBalancedCall::BackendMetricAccessor namespace { -CallTracer::CallAttemptTracer* GetCallAttemptTracer( +ClientCallTracer::CallAttemptTracer* CreateCallAttemptTracer( grpc_call_context_element* context, bool is_transparent_retry) { - auto* call_tracer = - static_cast(context[GRPC_CONTEXT_CALL_TRACER].value); + auto* call_tracer = static_cast( + context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value); if (call_tracer == nullptr) return nullptr; - return call_tracer->StartNewAttempt(is_transparent_retry); + auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry); + context[GRPC_CONTEXT_CALL_TRACER].value = tracer; + return tracer; } } // namespace @@ -2523,9 +2525,8 @@ ClientChannel::LoadBalancedCall::LoadBalancedCall( ? "LoadBalancedCall" : nullptr), chand_(chand), - call_dispatch_controller_(call_dispatch_controller), - call_attempt_tracer_( - GetCallAttemptTracer(call_context, is_transparent_retry)) { + call_dispatch_controller_(call_dispatch_controller) { + CreateCallAttemptTracer(call_context, is_transparent_retry); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: created", chand_, this); } @@ -2539,10 +2540,10 @@ ClientChannel::LoadBalancedCall::~LoadBalancedCall() { void ClientChannel::LoadBalancedCall::Orphan() { // Compute latency and report it to the tracer. - if (call_attempt_tracer_ != nullptr) { + if (call_attempt_tracer() != nullptr) { gpr_timespec latency = gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_); - call_attempt_tracer_->RecordEnd(latency); + call_attempt_tracer()->RecordEnd(latency); } Unref(); } @@ -2552,8 +2553,8 @@ void ClientChannel::LoadBalancedCall::RecordCallCompletion( grpc_transport_stream_stats* transport_stream_stats, absl::string_view peer_address) { // If we have a tracer, notify it. - if (call_attempt_tracer_ != nullptr) { - call_attempt_tracer_->RecordReceivedTrailingMetadata( + if (call_attempt_tracer() != nullptr) { + call_attempt_tracer()->RecordReceivedTrailingMetadata( status, recv_trailing_metadata, transport_stream_stats); } // If the LB policy requested a callback for trailing metadata, invoke @@ -2647,8 +2648,8 @@ absl::optional ClientChannel::LoadBalancedCall::PickSubchannel( } // Pick is complete. // If it was queued, add a trace annotation. - if (was_queued && call_attempt_tracer_ != nullptr) { - call_attempt_tracer_->RecordAnnotation("Delayed LB pick complete."); + if (was_queued && call_attempt_tracer() != nullptr) { + call_attempt_tracer()->RecordAnnotation("Delayed LB pick complete."); } // If the pick failed, fail the call. if (!error.ok()) { @@ -2915,7 +2916,7 @@ void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch( GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: batch started from above: %s, " - "call_attempt_tracer_=%p", + "call_attempt_tracer()=%p", chand(), this, grpc_transport_stream_op_batch_string(batch, false).c_str(), call_attempt_tracer()); @@ -2931,10 +2932,6 @@ void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch( call_attempt_tracer()->RecordSendInitialMetadata( batch->payload->send_initial_metadata.send_initial_metadata); } - if (batch->send_message) { - call_attempt_tracer()->RecordSendMessage( - *batch->payload->send_message.send_message); - } if (batch->send_trailing_metadata) { call_attempt_tracer()->RecordSendTrailingMetadata( batch->payload->send_trailing_metadata.send_trailing_metadata); @@ -2950,13 +2947,6 @@ void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch( batch->payload->recv_initial_metadata.recv_initial_metadata_ready = &recv_initial_metadata_ready_; } - if (batch->recv_message) { - recv_message_ = batch->payload->recv_message.recv_message; - original_recv_message_ready_ = - batch->payload->recv_message.recv_message_ready; - GRPC_CLOSURE_INIT(&recv_message_ready_, RecvMessageReady, this, nullptr); - batch->payload->recv_message.recv_message_ready = &recv_message_ready_; - } } // Intercept recv_trailing_metadata even if there is no call tracer, // since we may need to notify the LB policy about trailing metadata. @@ -3046,32 +3036,19 @@ void ClientChannel::FilterBasedLoadBalancedCall::RecvInitialMetadataReady( if (error.ok()) { // recv_initial_metadata_flags is not populated for clients self->call_attempt_tracer()->RecordReceivedInitialMetadata( - self->recv_initial_metadata_, 0 /* recv_initial_metadata_flags */); + self->recv_initial_metadata_); } Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_, error); } -void ClientChannel::FilterBasedLoadBalancedCall::RecvMessageReady( - void* arg, grpc_error_handle error) { - auto* self = static_cast(arg); - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { - gpr_log(GPR_INFO, "chand=%p lb_call=%p: got recv_message_ready: error=%s", - self->chand(), self, StatusToString(error).c_str()); - } - if (self->recv_message_->has_value()) { - self->call_attempt_tracer()->RecordReceivedMessage(**self->recv_message_); - } - Closure::Run(DEBUG_LOCATION, self->original_recv_message_ready_, error); -} - void ClientChannel::FilterBasedLoadBalancedCall::RecvTrailingMetadataReady( void* arg, grpc_error_handle error) { auto* self = static_cast(arg); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: got recv_trailing_metadata_ready: error=%s " - "call_attempt_tracer_=%p lb_subchannel_call_tracker_=%p " + "call_attempt_tracer()=%p lb_subchannel_call_tracker_=%p " "failure_error_=%s", self->chand(), self, StatusToString(error).c_str(), self->call_attempt_tracer(), self->lb_subchannel_call_tracker(), diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 5deb23ad16f..bfc35c27b4d 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -63,7 +63,6 @@ #include "src/core/lib/resolver/resolver.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/service_config/service_config.h" -#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata_batch.h" @@ -388,8 +387,9 @@ class ClientChannel::LoadBalancedCall ConfigSelector::CallDispatchController* call_dispatch_controller() const { return call_dispatch_controller_; } - CallTracer::CallAttemptTracer* call_attempt_tracer() const { - return call_attempt_tracer_; + ClientCallTracer::CallAttemptTracer* call_attempt_tracer() const { + return static_cast( + call_context()[GRPC_CONTEXT_CALL_TRACER].value); } gpr_cycle_counter lb_call_start_time() const { return lb_call_start_time_; } ConnectedSubchannel* connected_subchannel() const { @@ -442,7 +442,6 @@ class ClientChannel::LoadBalancedCall ClientChannel* chand_; ConfigSelector::CallDispatchController* call_dispatch_controller_; - CallTracer::CallAttemptTracer* call_attempt_tracer_; gpr_cycle_counter lb_call_start_time_ = gpr_get_cycle_counter(); @@ -525,7 +524,6 @@ class ClientChannel::FilterBasedLoadBalancedCall static void SendInitialMetadataOnComplete(void* arg, grpc_error_handle error); static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); - static void RecvMessageReady(void* arg, grpc_error_handle error); static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); // Called to perform a pick, both when the call is initially started @@ -568,11 +566,6 @@ class ClientChannel::FilterBasedLoadBalancedCall grpc_closure recv_initial_metadata_ready_; grpc_closure* original_recv_initial_metadata_ready_ = nullptr; - // For intercepting recv_message_ready. - absl::optional* recv_message_ = nullptr; - grpc_closure recv_message_ready_; - grpc_closure* original_recv_message_ready_ = nullptr; - // For intercepting recv_trailing_metadata_ready. grpc_metadata_batch* recv_trailing_metadata_ = nullptr; grpc_transport_stream_stats* transport_stream_stats_ = nullptr; diff --git a/src/core/ext/filters/http/message_compress/compression_filter.cc b/src/core/ext/filters/http/message_compress/compression_filter.cc index 03171819c68..a719f5b9132 100644 --- a/src/core/ext/filters/http/message_compress/compression_filter.cc +++ b/src/core/ext/filters/http/message_compress/compression_filter.cc @@ -36,6 +36,7 @@ #include #include "src/core/ext/filters/message_size/message_size_filter.h" +#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" @@ -113,6 +114,12 @@ MessageHandle CompressionFilter::CompressMessage( gpr_log(GPR_ERROR, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d", message->payload()->Length(), algorithm, message->flags()); } + auto* call_context = GetContext(); + auto* call_tracer = static_cast( + call_context[GRPC_CONTEXT_CALL_TRACER].value); + if (call_tracer != nullptr) { + call_tracer->RecordSendMessage(*message->payload()); + } // Check if we're allowed to compress this message // (apps might want to disable compression for certain messages to avoid // crime/beast like vulns). @@ -143,6 +150,9 @@ MessageHandle CompressionFilter::CompressMessage( } tmp.Swap(payload); flags |= GRPC_WRITE_INTERNAL_COMPRESS; + if (call_tracer != nullptr) { + call_tracer->RecordSendCompressedMessage(*message->payload()); + } } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { const char* algo_name; @@ -163,6 +173,12 @@ absl::StatusOr CompressionFilter::DecompressMessage( message->payload()->Length(), args.max_recv_message_length.value_or(-1), args.algorithm); } + auto* call_context = GetContext(); + auto* call_tracer = static_cast( + call_context[GRPC_CONTEXT_CALL_TRACER].value); + if (call_tracer != nullptr) { + call_tracer->RecordReceivedMessage(*message->payload()); + } // Check max message length. if (args.max_recv_message_length.has_value() && message->payload()->Length() > @@ -189,6 +205,9 @@ absl::StatusOr CompressionFilter::DecompressMessage( message->payload()->Swap(&decompressed_slices); message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS; message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED; + if (call_tracer != nullptr) { + call_tracer->RecordReceivedDecompressedMessage(*message->payload()); + } return std::move(message); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 5d183014274..32616181774 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -211,9 +211,9 @@ void MaybeRecordTransportAnnotation(grpc_chttp2_stream* s, if (!grpc_core::IsTraceRecordCallopsEnabled()) { return; } - grpc_core::CallTracer* call_tracer = static_cast( + auto* call_tracer = static_cast( static_cast( - s->context)[GRPC_CONTEXT_CALL_TRACER] + s->context)[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE] .value); if (!call_tracer) { return; diff --git a/src/core/ext/xds/xds_channel_stack_modifier.cc b/src/core/ext/xds/xds_channel_stack_modifier.cc index f6ec2a3a7fc..ff4acbdfb4e 100644 --- a/src/core/ext/xds/xds_channel_stack_modifier.cc +++ b/src/core/ext/xds/xds_channel_stack_modifier.cc @@ -62,8 +62,7 @@ bool XdsChannelStackModifier::ModifyChannelStack(ChannelStackBuilder* builder) { auto it = builder->mutable_stack()->begin(); while (it != builder->mutable_stack()->end()) { const char* filter_name_at_it = (*it)->name; - if (strcmp("census_server", filter_name_at_it) == 0 || - strcmp("opencensus_server", filter_name_at_it) == 0) { + if (strcmp("census_server", filter_name_at_it) == 0) { break; } ++it; diff --git a/src/core/lib/channel/server_call_tracer.cc b/src/core/lib/channel/call_tracer.cc similarity index 96% rename from src/core/lib/channel/server_call_tracer.cc rename to src/core/lib/channel/call_tracer.cc index ccd2a5020a9..e6dd42e57ed 100644 --- a/src/core/lib/channel/server_call_tracer.cc +++ b/src/core/lib/channel/call_tracer.cc @@ -18,7 +18,7 @@ #include -#include "src/core/lib/channel/server_call_tracer.h" +#include "src/core/lib/channel/call_tracer.h" namespace grpc_core { diff --git a/src/core/lib/channel/call_tracer.h b/src/core/lib/channel/call_tracer.h index 17483ea05f4..9ff70cef1cc 100644 --- a/src/core/lib/channel/call_tracer.h +++ b/src/core/lib/channel/call_tracer.h @@ -21,77 +21,141 @@ #include -#include - #include "absl/status/status.h" #include "absl/strings/string_view.h" #include +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/transport.h" namespace grpc_core { +// The interface hierarchy is as follows - +// CallTracerAnnotationInterface +// / \ +// ClientCallTracer CallTracerInterface +// / \ +// CallAttemptTracer ServerCallTracer + +// The base class for all tracer implementations. +class CallTracerAnnotationInterface { + public: + virtual ~CallTracerAnnotationInterface() {} + // Records an annotation on the call attempt. + // TODO(yashykt): If needed, extend this to attach attributes with + // annotations. + virtual void RecordAnnotation(absl::string_view annotation) = 0; +}; + +// The base class for CallAttemptTracer and ServerCallTracer. +// TODO(yashykt): What's a better name for this? +class CallTracerInterface : public CallTracerAnnotationInterface { + public: + ~CallTracerInterface() override {} + // Please refer to `grpc_transport_stream_op_batch_payload` for details on + // arguments. + virtual void RecordSendInitialMetadata( + grpc_metadata_batch* send_initial_metadata) = 0; + virtual void RecordSendTrailingMetadata( + grpc_metadata_batch* send_trailing_metadata) = 0; + virtual void RecordSendMessage(const SliceBuffer& send_message) = 0; + // Only invoked if message was actually compressed. + virtual void RecordSendCompressedMessage( + const SliceBuffer& send_compressed_message) = 0; + // The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()` + // methods should only be invoked when the metadata/message was + // successfully received, i.e., without any error. + virtual void RecordReceivedInitialMetadata( + grpc_metadata_batch* recv_initial_metadata) = 0; + virtual void RecordReceivedMessage(const SliceBuffer& recv_message) = 0; + // Only invoked if message was actually decompressed. + virtual void RecordReceivedDecompressedMessage( + const SliceBuffer& recv_decompressed_message) = 0; + virtual void RecordCancel(grpc_error_handle cancel_error) = 0; +}; + // Interface for a tracer that records activities on a call. Actual attempts for // this call are traced with CallAttemptTracer after invoking RecordNewAttempt() -// on the CallTracer object. -class CallTracer { +// on the ClientCallTracer object. +class ClientCallTracer : public CallTracerAnnotationInterface { public: // Interface for a tracer that records activities on a particular call // attempt. // (A single RPC can have multiple attempts due to retry/hedging policies or // as transparent retry attempts.) - class CallAttemptTracer { + class CallAttemptTracer : public CallTracerInterface { public: - virtual ~CallAttemptTracer() {} - // Please refer to `grpc_transport_stream_op_batch_payload` for details on - // arguments. - virtual void RecordSendInitialMetadata( - grpc_metadata_batch* send_initial_metadata) = 0; - virtual void RecordSendTrailingMetadata( - grpc_metadata_batch* send_trailing_metadata) = 0; - virtual void RecordSendMessage(const SliceBuffer& send_message) = 0; - // The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()` - // methods should only be invoked when the metadata/message was - // successfully received, i.e., without any error. - virtual void RecordReceivedInitialMetadata( - grpc_metadata_batch* recv_initial_metadata, uint32_t flags) = 0; - virtual void RecordReceivedMessage(const SliceBuffer& recv_message) = 0; + ~CallAttemptTracer() override {} + // TODO(yashykt): The following two methods `RecordReceivedTrailingMetadata` + // and `RecordEnd` should be moved into CallTracerInterface. // If the call was cancelled before the recv_trailing_metadata op // was started, recv_trailing_metadata and transport_stream_stats // will be null. virtual void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) = 0; - virtual void RecordCancel(grpc_error_handle cancel_error) = 0; // Should be the last API call to the object. Once invoked, the tracer // library is free to destroy the object. virtual void RecordEnd(const gpr_timespec& latency) = 0; - // Records an annotation on the call attempt. - // TODO(yashykt): If needed, extend this to attach attributes with - // annotations. - virtual void RecordAnnotation(absl::string_view annotation) = 0; }; - virtual ~CallTracer() {} + ~ClientCallTracer() override {} // Records a new attempt for the associated call. \a transparent denotes // whether the attempt is being made as a transparent retry or as a // non-transparent retry/heding attempt. (There will be at least one attempt - // even if the call is not being retried.) The `CallTracer` object retains - // ownership to the newly created `CallAttemptTracer` object. RecordEnd() - // serves as an indication that the call stack is done with all API calls, and - // the tracer library is free to destroy it after that. + // even if the call is not being retried.) The `ClientCallTracer` object + // retains ownership to the newly created `CallAttemptTracer` object. + // RecordEnd() serves as an indication that the call stack is done with all + // API calls, and the tracer library is free to destroy it after that. virtual CallAttemptTracer* StartNewAttempt(bool is_transparent_retry) = 0; - // Records an annotation on the call attempt. - // TODO(yashykt): If needed, extend this to attach attributes with - // annotations. - virtual void RecordAnnotation(absl::string_view annotation) = 0; }; +// Interface for a tracer that records activities on a server call. +class ServerCallTracer : public CallTracerInterface { + public: + ~ServerCallTracer() override {} + // TODO(yashykt): The following two methods `RecordReceivedTrailingMetadata` + // and `RecordEnd` should be moved into CallTracerInterface. + virtual void RecordReceivedTrailingMetadata( + grpc_metadata_batch* recv_trailing_metadata) = 0; + // Should be the last API call to the object. Once invoked, the tracer + // library is free to destroy the object. + virtual void RecordEnd(const grpc_call_final_info* final_info) = 0; +}; + +// Interface for a factory that can create a ServerCallTracer object per +// server call. +class ServerCallTracerFactory { + public: + struct RawPointerChannelArgTag {}; + + virtual ~ServerCallTracerFactory() {} + + virtual ServerCallTracer* CreateNewServerCallTracer(Arena* arena) = 0; + + // Use this method to get the server call tracer factory from channel args, + // instead of directly fetching it with `GetObject`. + static ServerCallTracerFactory* Get(const ChannelArgs& channel_args); + + // Registers a global ServerCallTracerFactory that wil be used by default if + // no corresponding channel arg was found. It is only valid to call this + // before grpc_init(). It is the responsibility of the caller to maintain + // this for the lifetime of the process. + static void RegisterGlobal(ServerCallTracerFactory* factory); + + static absl::string_view ChannelArgName(); +}; + +void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder); + } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_CHANNEL_CALL_TRACER_H diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index ea67e4d7654..3325d1d577d 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -36,7 +36,14 @@ typedef enum { /// Value is a \a census_context. GRPC_CONTEXT_TRACING, - /// Value is a CallTracer object. + /// Value is a CallTracerAnnotationInterface. (ClientCallTracer object on the + /// client-side call, or ServerCallTracer on the server-side.) + GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, + + /// Value is a CallTracerInterface (ServerCallTracer on the server-side, + /// CallAttemptTracer on a subchannel call.) + /// TODO(yashykt): Maybe come up with a better name. This will go away in the + /// future anyway, so not super important. GRPC_CONTEXT_CALL_TRACER, /// Reserved for traffic_class_context. diff --git a/src/core/lib/channel/server_call_tracer.h b/src/core/lib/channel/server_call_tracer.h deleted file mode 100644 index db8df0dc125..00000000000 --- a/src/core/lib/channel/server_call_tracer.h +++ /dev/null @@ -1,97 +0,0 @@ -// -// -// Copyright 2023 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// - -#ifndef GRPC_SRC_CORE_LIB_CHANNEL_SERVER_CALL_TRACER_H -#define GRPC_SRC_CORE_LIB_CHANNEL_SERVER_CALL_TRACER_H - -#include - -#include - -#include "absl/status/status.h" -#include "absl/strings/string_view.h" - -#include - -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/slice/slice_buffer.h" -#include "src/core/lib/transport/metadata_batch.h" -#include "src/core/lib/transport/transport.h" - -namespace grpc_core { - -// Interface for a tracer that records activities on a server call. -class ServerCallTracer { - public: - virtual ~ServerCallTracer() {} - // Please refer to `grpc_transport_stream_op_batch_payload` for details on - // arguments. - virtual void RecordSendInitialMetadata( - grpc_metadata_batch* send_initial_metadata) = 0; - virtual void RecordSendTrailingMetadata( - grpc_metadata_batch* send_trailing_metadata) = 0; - virtual void RecordSendMessage(const SliceBuffer& send_message) = 0; - // The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()` - // methods should only be invoked when the metadata/message was - // successfully received, i.e., without any error. - virtual void RecordReceivedInitialMetadata( - grpc_metadata_batch* recv_initial_metadata, uint32_t flags) = 0; - virtual void RecordReceivedMessage(const SliceBuffer& recv_message) = 0; - // If the call was cancelled before the recv_trailing_metadata op - // was started, recv_trailing_metadata and transport_stream_stats - // will be null. - virtual void RecordReceivedTrailingMetadata( - absl::Status status, grpc_metadata_batch* recv_trailing_metadata, - const grpc_transport_stream_stats* transport_stream_stats) = 0; - virtual void RecordCancel(grpc_error_handle cancel_error) = 0; - // Should be the last API call to the object. Once invoked, the tracer - // library is free to destroy the object. - virtual void RecordEnd(const gpr_timespec& latency) = 0; - // Records an annotation on the call attempt. - // TODO(yashykt): If needed, extend this to attach attributes with - // annotations. - virtual void RecordAnnotation(absl::string_view annotation) = 0; -}; - -// Interface for a factory that can create a ServerCallTracer object per server -// call. -class ServerCallTracerFactory { - public: - struct RawPointerChannelArgTag {}; - - virtual ~ServerCallTracerFactory() {} - - virtual ServerCallTracer* CreateNewServerCallTracer() = 0; - - // Use this method to get the server call tracer factory from channel args, - // instead of directly fetching it with `GetObject`. - static ServerCallTracerFactory* Get(const ChannelArgs& channel_args); - - // Registers a global ServerCallTracerFactory that wil be used by default if - // no corresponding channel arg was found. It is only valid to call this - // before grpc_init(). It is the responsibility of the caller to maintain this - // for the lifetime of the process. - static void RegisterGlobal(ServerCallTracerFactory* factory); - - static absl::string_view ChannelArgName(); -}; - -} // namespace grpc_core - -#endif // GRPC_SRC_CORE_LIB_CHANNEL_SERVER_CALL_TRACER_H diff --git a/src/core/lib/channel/server_call_tracer_filter.cc b/src/core/lib/channel/server_call_tracer_filter.cc new file mode 100644 index 00000000000..893f33e4e5b --- /dev/null +++ b/src/core/lib/channel/server_call_tracer_filter.cc @@ -0,0 +1,110 @@ +// +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" + +#include "src/core/lib/channel/call_finalization.h" +#include "src/core/lib/channel/call_tracer.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_fwd.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/channel/context.h" +#include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/promise/arena_promise.h" +#include "src/core/lib/promise/cancel_callback.h" +#include "src/core/lib/promise/context.h" +#include "src/core/lib/promise/map.h" +#include "src/core/lib/promise/pipe.h" +#include "src/core/lib/promise/poll.h" +#include "src/core/lib/surface/channel_init.h" +#include "src/core/lib/surface/channel_stack_type.h" +#include "src/core/lib/transport/transport.h" + +namespace grpc_core { + +namespace { + +// TODO(yashykt): This filter is not really needed. We should be able to move +// this to the connected filter. +class ServerCallTracerFilter : public ChannelFilter { + public: + static const grpc_channel_filter kFilter; + + static absl::StatusOr Create( + const ChannelArgs& /*args*/, ChannelFilter::Args /*filter_args*/); + + ArenaPromise MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) override; +}; + +const grpc_channel_filter ServerCallTracerFilter::kFilter = + MakePromiseBasedFilter( + "server_call_tracer"); + +absl::StatusOr ServerCallTracerFilter::Create( + const ChannelArgs& /*args*/, ChannelFilter::Args /*filter_args*/) { + return ServerCallTracerFilter(); +} + +ArenaPromise ServerCallTracerFilter::MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) { + auto* call_context = GetContext(); + auto* call_tracer = static_cast( + call_context[GRPC_CONTEXT_CALL_TRACER].value); + if (call_tracer == nullptr) { + return next_promise_factory(std::move(call_args)); + } + call_tracer->RecordReceivedInitialMetadata( + call_args.client_initial_metadata.get()); + call_args.server_initial_metadata->InterceptAndMap( + [call_tracer](ServerMetadataHandle metadata) { + call_tracer->RecordSendInitialMetadata(metadata.get()); + return metadata; + }); + GetContext()->Add( + [call_tracer](const grpc_call_final_info* final_info) { + call_tracer->RecordEnd(final_info); + }); + return OnCancel( + Map(next_promise_factory(std::move(call_args)), + [call_tracer](ServerMetadataHandle md) { + call_tracer->RecordSendTrailingMetadata(md.get()); + return md; + }), + [call_tracer]() { call_tracer->RecordCancel(absl::CancelledError()); }); +} + +} // namespace + +void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder) { + builder->channel_init()->RegisterStage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + [](ChannelStackBuilder* builder) { + builder->AppendFilter(&ServerCallTracerFilter::kFilter); + return true; + }); +} + +} // namespace grpc_core diff --git a/src/core/lib/surface/builtins.cc b/src/core/lib/surface/builtins.cc index 2b50d2c3536..bb20388b401 100644 --- a/src/core/lib/surface/builtins.cc +++ b/src/core/lib/surface/builtins.cc @@ -18,6 +18,7 @@ #include +#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/config/core_configuration.h" @@ -29,6 +30,7 @@ namespace grpc_core { void RegisterBuiltins(CoreConfiguration::Builder* builder) { + RegisterServerCallTracerFilter(builder); builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, grpc_add_connected_filter); diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 0633b9b47da..f2730770915 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -61,6 +61,7 @@ #include "src/core/lib/channel/call_finalization.h" #include "src/core/lib/channel/call_tracer.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/context.h" @@ -500,7 +501,7 @@ class FilterStackCall final : public Call { } struct BatchControl { FilterStackCall* call_ = nullptr; - CallTracer* call_tracer_ = nullptr; + CallTracerAnnotationInterface* call_tracer_ = nullptr; grpc_transport_stream_op_batch op_; // Share memory for cq_completion and notify_tag as they are never needed // simultaneously. Each byte used in this data structure count as six bytes @@ -534,7 +535,7 @@ class FilterStackCall final : public Call { // call_ being set to nullptr in PostCompletion method. Store the // call_tracer_ and call_ variables locally as well because they could be // modified by another thread after the fetch_sub operation. - CallTracer* call_tracer = call_tracer_; + CallTracerAnnotationInterface* call_tracer = call_tracer_; FilterStackCall* call = call_; bool is_call_trace_enabled = grpc_call_trace.enabled(); bool is_call_ops_annotate_enabled = @@ -743,6 +744,26 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args, global_stats().IncrementServerCallsCreated(); call->final_op_.server.cancelled = nullptr; call->final_op_.server.core_server = args->server; + // TODO(yashykt): In the future, we want to also enable stats and trace + // collecting from when the call is created at the transport. The idea is + // that the transport would create the call tracer and pass it in as part of + // the metadata. + auto* server_call_tracer_factory = ServerCallTracerFactory::Get( + args->server != nullptr ? args->server->channel_args() : ChannelArgs()); + if (server_call_tracer_factory != nullptr) { + auto* server_call_tracer = + server_call_tracer_factory->CreateNewServerCallTracer(arena); + if (server_call_tracer != nullptr) { + // Note that we are setting both + // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and + // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future + // promise-based world, we would just a single tracer object for each + // stack (call, subchannel_call, server_call.) + call->ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, + server_call_tracer, nullptr); + call->ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr); + } + } } Call* parent = Call::FromC(args->parent); @@ -1173,8 +1194,8 @@ FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl( *pslot = bctl; } bctl->call_ = this; - bctl->call_tracer_ = - static_cast(ContextGet(GRPC_CONTEXT_CALL_TRACER)); + bctl->call_tracer_ = static_cast( + ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE)); bctl->op_.payload = &stream_op_payload_; return bctl; } @@ -1445,7 +1466,7 @@ grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops, grpc_transport_stream_op_batch_payload* stream_op_payload; uint32_t seen_ops = 0; intptr_t pending_ops = 0; - CallTracer* call_tracer = nullptr; + CallTracerAnnotationInterface* call_tracer = nullptr; for (i = 0; i < nops; i++) { if (seen_ops & (1u << ops[i].op)) { @@ -1837,7 +1858,8 @@ grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops, stream_op->on_complete = &bctl->finish_batch_; } - call_tracer = static_cast(ContextGet(GRPC_CONTEXT_CALL_TRACER)); + call_tracer = static_cast( + ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE)); if ((IsTraceRecordCallopsEnabled() && call_tracer != nullptr)) { call_tracer->RecordAnnotation(absl::StrFormat( "BATCH:%p START:%s BATCH:%s (tag:%p)", bctl, @@ -3300,6 +3322,26 @@ ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena, if (channelz_node != nullptr) { channelz_node->RecordCallStarted(); } + // TODO(yashykt): In the future, we want to also enable stats and trace + // collecting from when the call is created at the transport. The idea is that + // the transport would create the call tracer and pass it in as part of the + // metadata. + auto* server_call_tracer_factory = + ServerCallTracerFactory::Get(args->server->channel_args()); + if (server_call_tracer_factory != nullptr) { + auto* server_call_tracer = + server_call_tracer_factory->CreateNewServerCallTracer(arena); + if (server_call_tracer != nullptr) { + // Note that we are setting both + // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and + // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future + // promise-based world, we would just a single tracer object for each + // stack (call, subchannel_call, server_call.) + ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, + server_call_tracer, nullptr); + ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr); + } + } MutexLock lock(mu()); ScopedContext activity_context(this); promise_ = channel()->channel_stack()->MakeServerCallPromise( diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index c120471fbec..cf0d0b3c0c6 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -76,8 +76,8 @@ void BuildCoreConfiguration(CoreConfiguration::Builder* builder) { grpc_event_engine::experimental::RegisterEventEngineChannelArgPreconditioning( builder); // The order of the handshaker registration is crucial here. - // We want TCP connect handshaker to be registered last so that it is added to - // the start of the handshaker list. + // We want TCP connect handshaker to be registered last so that it is added + // to the start of the handshaker list. RegisterHttpConnectHandshaker(builder); RegisterTCPConnectHandshaker(builder); RegisterPriorityLbPolicy(builder); diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc index 2a53094a5b6..0fdcc884a8f 100644 --- a/src/cpp/ext/filters/census/client_filter.cc +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -101,9 +101,11 @@ OpenCensusClientFilter::MakeCallPromise( call_context, path != nullptr ? path->Ref() : grpc_core::Slice(), grpc_core::GetContext(), OpenCensusTracingEnabled() && tracing_enabled_); - GPR_DEBUG_ASSERT(call_context[GRPC_CONTEXT_CALL_TRACER].value == nullptr); - call_context[GRPC_CONTEXT_CALL_TRACER].value = tracer; - call_context[GRPC_CONTEXT_CALL_TRACER].destroy = nullptr; + GPR_DEBUG_ASSERT( + call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value == + nullptr); + call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer; + call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy = nullptr; return next_promise_factory(std::move(call_args)); } diff --git a/src/cpp/ext/filters/census/grpc_plugin.cc b/src/cpp/ext/filters/census/grpc_plugin.cc index f05ca49f77a..eb62c7d5f5e 100644 --- a/src/cpp/ext/filters/census/grpc_plugin.cc +++ b/src/cpp/ext/filters/census/grpc_plugin.cc @@ -32,16 +32,19 @@ #include #include +#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/surface/channel_stack_type.h" #include "src/cpp/ext/filters/census/client_filter.h" #include "src/cpp/ext/filters/census/measures.h" -#include "src/cpp/ext/filters/census/server_filter.h" +#include "src/cpp/ext/filters/census/server_call_tracer.h" namespace grpc { void RegisterOpenCensusPlugin() { + grpc_core::ServerCallTracerFactory::RegisterGlobal( + new grpc::internal::OpenCensusServerCallTracerFactory); grpc_core::CoreConfiguration::RegisterBuilder( [](grpc_core::CoreConfiguration::Builder* builder) { builder->channel_init()->RegisterStage( @@ -51,13 +54,6 @@ void RegisterOpenCensusPlugin() { &grpc::internal::OpenCensusClientFilter::kFilter); return true; }); - builder->channel_init()->RegisterStage( - GRPC_SERVER_CHANNEL, /*priority=*/INT_MAX, - [](grpc_core::ChannelStackBuilder* builder) { - builder->PrependFilter( - &grpc::internal::OpenCensusServerFilter::kFilter); - return true; - }); }); // Access measures to ensure they are initialized. Otherwise, creating a view diff --git a/src/cpp/ext/filters/census/open_census_call_tracer.h b/src/cpp/ext/filters/census/open_census_call_tracer.h index 547322f4a9a..9385794f2d4 100644 --- a/src/cpp/ext/filters/census/open_census_call_tracer.h +++ b/src/cpp/ext/filters/census/open_census_call_tracer.h @@ -56,7 +56,7 @@ namespace grpc { namespace internal { -class OpenCensusCallTracer : public grpc_core::CallTracer { +class OpenCensusCallTracer : public grpc_core::ClientCallTracer { public: class OpenCensusCallAttemptTracer : public CallAttemptTracer { public: @@ -69,11 +69,14 @@ class OpenCensusCallTracer : public grpc_core::CallTracer { grpc_metadata_batch* /*send_trailing_metadata*/) override {} void RecordSendMessage( const grpc_core::SliceBuffer& /*send_message*/) override; + void RecordSendCompressedMessage( + const grpc_core::SliceBuffer& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( - grpc_metadata_batch* /*recv_initial_metadata*/, - uint32_t /*flags*/) override {} + grpc_metadata_batch* /*recv_initial_metadata*/) override {} void RecordReceivedMessage( const grpc_core::SliceBuffer& /*recv_message*/) override; + void RecordReceivedDecompressedMessage( + const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {} void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) override; diff --git a/src/cpp/ext/filters/census/server_filter.cc b/src/cpp/ext/filters/census/server_call_tracer.cc similarity index 60% rename from src/cpp/ext/filters/census/server_filter.cc rename to src/cpp/ext/filters/census/server_call_tracer.cc index ac2006c0c05..af7f3865dda 100644 --- a/src/cpp/ext/filters/census/server_filter.cc +++ b/src/cpp/ext/filters/census/server_call_tracer.cc @@ -18,13 +18,12 @@ #include -#include "src/cpp/ext/filters/census/server_filter.h" +#include "src/cpp/ext/filters/census/server_call_tracer.h" #include #include #include -#include #include #include #include @@ -41,18 +40,15 @@ #include -#include "src/core/lib/channel/call_finalization.h" +#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" -#include "src/core/lib/promise/cancel_callback.h" +#include "src/core/lib/iomgr/error.h" #include "src/core/lib/promise/context.h" -#include "src/core/lib/promise/map.h" -#include "src/core/lib/promise/pipe.h" -#include "src/core/lib/promise/poll.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice.h" +#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/transport/metadata_batch.h" -#include "src/core/lib/transport/transport.h" #include "src/cpp/ext/filters/census/context.h" #include "src/cpp/ext/filters/census/grpc_plugin.h" #include "src/cpp/ext/filters/census/measures.h" @@ -91,27 +87,54 @@ void FilterInitialMetadata(grpc_metadata_batch* b, } // namespace -// An OpenCensusServerCallData class will be created for every grpc call within -// a channel. It is used to store data and methods specific to that call. -// OpenCensusServerCallData is thread-compatible, however typically only 1 -// thread should be interacting with a call at a time. -class OpenCensusServerCallData { +// OpenCensusServerCallTracer implementation + +class OpenCensusServerCallTracer : public grpc_core::ServerCallTracer { public: // Maximum size of server stats that are sent on the wire. static constexpr uint32_t kMaxServerStatsLen = 16; - explicit OpenCensusServerCallData( - grpc_metadata_batch* client_initial_metadata); + OpenCensusServerCallTracer() + : start_time_(absl::Now()), + recv_message_count_(0), + sent_message_count_(0) {} - void OnSendMessage() { ++sent_message_count_; } + // Please refer to `grpc_transport_stream_op_batch_payload` for details on + // arguments. + void RecordSendInitialMetadata( + grpc_metadata_batch* /*send_initial_metadata*/) override {} - void OnRecvMessage() { ++recv_message_count_; } + void RecordSendTrailingMetadata( + grpc_metadata_batch* send_trailing_metadata) override; - void OnServerTrailingMetadata(grpc_metadata_batch* server_trailing_metadata); + void RecordSendMessage( + const grpc_core::SliceBuffer& /*send_message*/) override { + ++sent_message_count_; + } + void RecordSendCompressedMessage( + const grpc_core::SliceBuffer& /*send_compressed_message*/) override {} - void OnCancel() { elapsed_time_ = absl::Now() - start_time_; } + void RecordReceivedInitialMetadata( + grpc_metadata_batch* recv_initial_metadata) override; - void Finalize(const grpc_call_final_info* final_info); + void RecordReceivedMessage( + const grpc_core::SliceBuffer& /*recv_message*/) override { + ++recv_message_count_; + } + void RecordReceivedDecompressedMessage( + const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {} + void RecordReceivedTrailingMetadata( + grpc_metadata_batch* /*recv_trailing_metadata*/) override {} + + void RecordCancel(grpc_error_handle /*cancel_error*/) override { + elapsed_time_ = absl::Now() - start_time_; + } + + void RecordEnd(const grpc_call_final_info* final_info) override; + + void RecordAnnotation(absl::string_view annotation) override { + context_.AddSpanAnnotation(annotation, {}); + } private: experimental::CensusContext context_; @@ -128,13 +151,10 @@ class OpenCensusServerCallData { char stats_buf_[kMaxServerStatsLen]; }; -constexpr uint32_t OpenCensusServerCallData::kMaxServerStatsLen; - -OpenCensusServerCallData::OpenCensusServerCallData( - grpc_metadata_batch* client_initial_metadata) - : start_time_(absl::Now()), recv_message_count_(0), sent_message_count_(0) { +void OpenCensusServerCallTracer::RecordReceivedInitialMetadata( + grpc_metadata_batch* recv_initial_metadata) { ServerMetadataElements sml; - FilterInitialMetadata(client_initial_metadata, &sml); + FilterInitialMetadata(recv_initial_metadata, &sml); path_ = std::move(sml.path); method_ = GetMethod(path_); auto tracing_enabled = OpenCensusTracingEnabled(); @@ -153,7 +173,23 @@ OpenCensusServerCallData::OpenCensusServerCallData( } } -void OpenCensusServerCallData::Finalize( +void OpenCensusServerCallTracer::RecordSendTrailingMetadata( + grpc_metadata_batch* send_trailing_metadata) { + // We need to record the time when the trailing metadata was sent to + // mark the completeness of the request. + elapsed_time_ = absl::Now() - start_time_; + if (OpenCensusStatsEnabled() && send_trailing_metadata != nullptr) { + size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_), + stats_buf_, kMaxServerStatsLen); + if (len > 0) { + send_trailing_metadata->Set( + grpc_core::GrpcServerStatsBinMetadata(), + grpc_core::Slice::FromCopiedBuffer(stats_buf_, len)); + } + } +} + +void OpenCensusServerCallTracer::RecordEnd( const grpc_call_final_info* final_info) { if (OpenCensusStatsEnabled()) { const uint64_t request_size = GetOutgoingDataSize(final_info); @@ -178,66 +214,14 @@ void OpenCensusServerCallData::Finalize( } } -void OpenCensusServerCallData::OnServerTrailingMetadata( - grpc_metadata_batch* server_trailing_metadata) { - // We need to record the time when the trailing metadata was sent to - // mark the completeness of the request. - elapsed_time_ = absl::Now() - start_time_; - if (OpenCensusStatsEnabled() && server_trailing_metadata != nullptr) { - size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_), - stats_buf_, kMaxServerStatsLen); - if (len > 0) { - server_trailing_metadata->Set( - grpc_core::GrpcServerStatsBinMetadata(), - grpc_core::Slice::FromCopiedBuffer(stats_buf_, len)); - } - } -} - // -// OpenCensusServerFilter +// OpenCensusServerCallTracerFactory // -const grpc_channel_filter OpenCensusServerFilter::kFilter = - grpc_core::MakePromiseBasedFilter< - OpenCensusServerFilter, grpc_core::FilterEndpoint::kServer, - grpc_core::kFilterExaminesServerInitialMetadata | - grpc_core::kFilterExaminesInboundMessages | - grpc_core::kFilterExaminesOutboundMessages>("opencensus_server"); - -absl::StatusOr OpenCensusServerFilter::Create( - const grpc_core::ChannelArgs& /*args*/, - grpc_core::ChannelFilter::Args /*filter_args*/) { - return OpenCensusServerFilter(); -} - -grpc_core::ArenaPromise -OpenCensusServerFilter::MakeCallPromise( - grpc_core::CallArgs call_args, - grpc_core::NextPromiseFactory next_promise_factory) { - auto* calld = grpc_core::GetContext() - ->ManagedNew( - call_args.client_initial_metadata.get()); - call_args.client_to_server_messages->InterceptAndMap( - [calld](grpc_core::MessageHandle message) { - calld->OnRecvMessage(); - return message; - }); - call_args.server_to_client_messages->InterceptAndMap( - [calld](grpc_core::MessageHandle message) { - calld->OnSendMessage(); - return message; - }); - grpc_core::GetContext()->Add( - [calld](const grpc_call_final_info* final_info) { - calld->Finalize(final_info); - }); - return grpc_core::OnCancel(Map(next_promise_factory(std::move(call_args)), - [calld](grpc_core::ServerMetadataHandle md) { - calld->OnServerTrailingMetadata(md.get()); - return md; - }), - [calld]() { calld->OnCancel(); }); +grpc_core::ServerCallTracer* +OpenCensusServerCallTracerFactory::CreateNewServerCallTracer( + grpc_core::Arena* arena) { + return arena->ManagedNew(); } } // namespace internal diff --git a/src/cpp/ext/filters/census/server_call_tracer.h b/src/cpp/ext/filters/census/server_call_tracer.h new file mode 100644 index 00000000000..7eb95c62c1a --- /dev/null +++ b/src/cpp/ext/filters/census/server_call_tracer.h @@ -0,0 +1,40 @@ +// +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_CALL_TRACER_H +#define GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_CALL_TRACER_H + +#include + +#include "src/core/lib/channel/call_tracer.h" +#include "src/core/lib/resource_quota/arena.h" + +namespace grpc { +namespace internal { + +class OpenCensusServerCallTracerFactory + : public grpc_core::ServerCallTracerFactory { + public: + grpc_core::ServerCallTracer* CreateNewServerCallTracer( + grpc_core::Arena* arena) override; +}; + +} // namespace internal +} // namespace grpc + +#endif // GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_CALL_TRACER_H diff --git a/src/cpp/ext/filters/census/server_filter.h b/src/cpp/ext/filters/census/server_filter.h deleted file mode 100644 index cd178124ac5..00000000000 --- a/src/cpp/ext/filters/census/server_filter.h +++ /dev/null @@ -1,54 +0,0 @@ -// -// -// Copyright 2018 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// - -#ifndef GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H -#define GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H - -#include - -#include "absl/status/statusor.h" - -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/channel_fwd.h" -#include "src/core/lib/channel/promise_based_filter.h" -#include "src/core/lib/promise/arena_promise.h" -#include "src/core/lib/transport/transport.h" - -namespace grpc { -namespace internal { - -class OpenCensusServerFilter : public grpc_core::ChannelFilter { - public: - static const grpc_channel_filter kFilter; - - static absl::StatusOr Create( - const grpc_core::ChannelArgs& /*args*/, - grpc_core::ChannelFilter::Args /*filter_args*/); - - grpc_core::ArenaPromise MakeCallPromise( - grpc_core::CallArgs call_args, - grpc_core::NextPromiseFactory next_promise_factory) override; - - private: - OpenCensusServerFilter() = default; -}; - -} // namespace internal -} // namespace grpc - -#endif // GRPC_SRC_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 4f9c6c1b884..83b622e0b38 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -460,6 +460,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/address_utils/parse_address.cc', 'src/core/lib/address_utils/sockaddr_utils.cc', 'src/core/lib/backoff/backoff.cc', + 'src/core/lib/channel/call_tracer.cc', 'src/core/lib/channel/channel_args.cc', 'src/core/lib/channel/channel_args_preconditioning.cc', 'src/core/lib/channel/channel_stack.cc', @@ -470,7 +471,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/channelz_registry.cc', 'src/core/lib/channel/connected_channel.cc', 'src/core/lib/channel/promise_based_filter.cc', - 'src/core/lib/channel/server_call_tracer.cc', + 'src/core/lib/channel/server_call_tracer_filter.cc', 'src/core/lib/channel/status_util.cc', 'src/core/lib/compression/compression.cc', 'src/core/lib/compression/compression_internal.cc', diff --git a/test/core/channel/minimal_stack_is_minimal_test.cc b/test/core/channel/minimal_stack_is_minimal_test.cc index 4ae52273155..cbdb10b4bb6 100644 --- a/test/core/channel/minimal_stack_is_minimal_test.cc +++ b/test/core/channel/minimal_stack_is_minimal_test.cc @@ -91,8 +91,9 @@ TEST(ChannelStackFilters, LooksAsExpected) { std::vector({"authority", "connected"})); EXPECT_EQ(MakeStack("unknown", minimal_stack_args, GRPC_CLIENT_SUBCHANNEL), std::vector({"authority", "connected"})); - EXPECT_EQ(MakeStack("unknown", minimal_stack_args, GRPC_SERVER_CHANNEL), - std::vector({"server", "connected"})); + EXPECT_EQ( + MakeStack("unknown", minimal_stack_args, GRPC_SERVER_CHANNEL), + std::vector({"server", "server_call_tracer", "connected"})); EXPECT_EQ(MakeStack("chttp2", minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL), std::vector( @@ -101,8 +102,8 @@ TEST(ChannelStackFilters, LooksAsExpected) { std::vector( {"authority", "http-client", "compression", "connected"})); EXPECT_EQ(MakeStack("chttp2", minimal_stack_args, GRPC_SERVER_CHANNEL), - std::vector( - {"server", "http-server", "compression", "connected"})); + std::vector({"server", "http-server", "compression", + "server_call_tracer", "connected"})); EXPECT_EQ(MakeStack(nullptr, minimal_stack_args, GRPC_CLIENT_CHANNEL), std::vector({"client-channel"})); @@ -115,8 +116,8 @@ TEST(ChannelStackFilters, LooksAsExpected) { MakeStack("unknown", no_args, GRPC_CLIENT_SUBCHANNEL), std::vector({"authority", "message_size", "connected"})); EXPECT_EQ(MakeStack("unknown", no_args, GRPC_SERVER_CHANNEL), - std::vector( - {"server", "message_size", "deadline", "connected"})); + std::vector({"server", "message_size", "deadline", + "server_call_tracer", "connected"})); EXPECT_EQ( MakeStack("chttp2", no_args, GRPC_CLIENT_DIRECT_CHANNEL), @@ -127,10 +128,10 @@ TEST(ChannelStackFilters, LooksAsExpected) { std::vector({"authority", "message_size", "http-client", "compression", "connected"})); - EXPECT_EQ( - MakeStack("chttp2", no_args, GRPC_SERVER_CHANNEL), - std::vector({"server", "message_size", "deadline", - "http-server", "compression", "connected"})); + EXPECT_EQ(MakeStack("chttp2", no_args, GRPC_SERVER_CHANNEL), + std::vector({"server", "message_size", "deadline", + "http-server", "compression", + "server_call_tracer", "connected"})); EXPECT_EQ(MakeStack(nullptr, no_args, GRPC_CLIENT_CHANNEL), std::vector({"client-channel"})); } diff --git a/test/core/channel/server_call_tracer_factory_test.cc b/test/core/channel/server_call_tracer_factory_test.cc index 2b88ac4fd15..b4e01a50ab1 100644 --- a/test/core/channel/server_call_tracer_factory_test.cc +++ b/test/core/channel/server_call_tracer_factory_test.cc @@ -14,16 +14,17 @@ #include "gtest/gtest.h" +#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/server_call_tracer.h" #include "src/core/lib/gprpp/crash.h" +#include "src/core/lib/resource_quota/arena.h" namespace grpc_core { namespace { class TestServerCallTracerFactory : public ServerCallTracerFactory { public: - ServerCallTracer* CreateNewServerCallTracer() override { + ServerCallTracer* CreateNewServerCallTracer(Arena* /*arena*/) override { Crash("Not implemented"); } }; diff --git a/test/core/xds/xds_channel_stack_modifier_test.cc b/test/core/xds/xds_channel_stack_modifier_test.cc index ca026012e87..8da455e365f 100644 --- a/test/core/xds/xds_channel_stack_modifier_test.cc +++ b/test/core/xds/xds_channel_stack_modifier_test.cc @@ -26,7 +26,6 @@ #include "gtest/gtest.h" #include -#include #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" @@ -111,44 +110,6 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertion) { grpc_shutdown(); } -// Test filters insertion with OpenCensus plugin registered -TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertionAfterCensus) { - CoreConfiguration::Reset(); - grpc::RegisterOpenCensusPlugin(); - grpc_init(); - // Add 2 test filters to XdsChannelStackModifier - const grpc_channel_filter test_filter_1 = { - nullptr, nullptr, nullptr, 0, nullptr, nullptr, nullptr, - 0, nullptr, nullptr, nullptr, nullptr, kTestFilter1}; - const grpc_channel_filter test_filter_2 = { - nullptr, nullptr, nullptr, 0, nullptr, nullptr, nullptr, - 0, nullptr, nullptr, nullptr, nullptr, kTestFilter2}; - auto channel_stack_modifier = MakeRefCounted( - std::vector{&test_filter_1, &test_filter_2}); - grpc_arg arg = channel_stack_modifier->MakeChannelArg(); - // Create a phony ChannelStackBuilder object - grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); - ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL, - ChannelArgs::FromC(args)); - grpc_channel_args_destroy(args); - grpc_transport_vtable fake_transport_vtable; - memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable)); - fake_transport_vtable.name = "fake"; - grpc_transport fake_transport = {&fake_transport_vtable}; - builder.SetTransport(&fake_transport); - // Construct channel stack and verify that the test filters were successfully - // added after the census filter - ASSERT_TRUE(CoreConfiguration::Get().channel_init().CreateStack(&builder)); - std::vector filters; - for (const auto& entry : *builder.mutable_stack()) { - filters.push_back(entry->name); - } - filters.resize(4); - EXPECT_EQ(filters, std::vector({"server", "opencensus_server", - kTestFilter1, kTestFilter2})); - grpc_shutdown(); -} - } // namespace } // namespace testing } // namespace grpc_core diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index dc092f276c7..4a41380df9c 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1990,6 +1990,7 @@ src/core/lib/avl/avl.h \ src/core/lib/backoff/backoff.cc \ src/core/lib/backoff/backoff.h \ src/core/lib/channel/call_finalization.h \ +src/core/lib/channel/call_tracer.cc \ src/core/lib/channel/call_tracer.h \ src/core/lib/channel/channel_args.cc \ src/core/lib/channel/channel_args.h \ @@ -2013,8 +2014,7 @@ src/core/lib/channel/connected_channel.h \ src/core/lib/channel/context.h \ src/core/lib/channel/promise_based_filter.cc \ src/core/lib/channel/promise_based_filter.h \ -src/core/lib/channel/server_call_tracer.cc \ -src/core/lib/channel/server_call_tracer.h \ +src/core/lib/channel/server_call_tracer_filter.cc \ src/core/lib/channel/status_util.cc \ src/core/lib/channel/status_util.h \ src/core/lib/compression/compression.cc \ @@ -2390,6 +2390,7 @@ src/core/lib/matchers/matchers.h \ src/core/lib/promise/activity.cc \ src/core/lib/promise/activity.h \ src/core/lib/promise/arena_promise.h \ +src/core/lib/promise/cancel_callback.h \ src/core/lib/promise/context.h \ src/core/lib/promise/detail/basic_join.h \ src/core/lib/promise/detail/basic_seq.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 9f4eeac988a..22bc220dc85 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1768,6 +1768,7 @@ src/core/lib/backoff/backoff.cc \ src/core/lib/backoff/backoff.h \ src/core/lib/channel/README.md \ src/core/lib/channel/call_finalization.h \ +src/core/lib/channel/call_tracer.cc \ src/core/lib/channel/call_tracer.h \ src/core/lib/channel/channel_args.cc \ src/core/lib/channel/channel_args.h \ @@ -1791,8 +1792,7 @@ src/core/lib/channel/connected_channel.h \ src/core/lib/channel/context.h \ src/core/lib/channel/promise_based_filter.cc \ src/core/lib/channel/promise_based_filter.h \ -src/core/lib/channel/server_call_tracer.cc \ -src/core/lib/channel/server_call_tracer.h \ +src/core/lib/channel/server_call_tracer_filter.cc \ src/core/lib/channel/status_util.cc \ src/core/lib/channel/status_util.h \ src/core/lib/compression/compression.cc \ @@ -2171,6 +2171,7 @@ src/core/lib/matchers/matchers.h \ src/core/lib/promise/activity.cc \ src/core/lib/promise/activity.h \ src/core/lib/promise/arena_promise.h \ +src/core/lib/promise/cancel_callback.h \ src/core/lib/promise/context.h \ src/core/lib/promise/detail/basic_join.h \ src/core/lib/promise/detail/basic_seq.h \