Merge branch 'server-port' into inproc-fuzzing

pull/35277/head
Craig Tiller 1 year ago
commit 853c11f144
  1. 3
      BUILD
  2. 4
      CMakeLists.txt
  3. 2
      Makefile
  4. 2
      Package.swift
  5. 12
      bazel/experiments.bzl
  6. 7
      build_autogenerated.yaml
  7. 1
      config.m4
  8. 1
      config.w32
  9. 2
      gRPC-C++.podspec
  10. 3
      gRPC-Core.podspec
  11. 2
      grpc.gemspec
  12. 2
      grpc.gyp
  13. 2
      package.xml
  14. 1
      src/core/BUILD
  15. 48
      src/core/ext/filters/http/http_filters_plugin.cc
  16. 135
      src/core/ext/filters/http/message_compress/compression_filter.cc
  17. 66
      src/core/ext/filters/http/message_compress/compression_filter.h
  18. 325
      src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
  19. 139
      src/core/ext/filters/http/message_compress/legacy_compression_filter.h
  20. 12
      src/core/ext/transport/chaotic_good/client_transport.cc
  21. 65
      src/core/ext/transport/chaotic_good/client_transport.h
  22. 65
      src/core/ext/transport/chaotic_good/frame.cc
  23. 18
      src/core/ext/transport/chaotic_good/frame.h
  24. 18
      src/core/ext/transport/chaotic_good/frame_header.cc
  25. 14
      src/core/ext/transport/chaotic_good/frame_header.h
  26. 2
      src/core/lib/channel/connected_channel.cc
  27. 182
      src/core/lib/channel/promise_based_filter.h
  28. 72
      src/core/lib/experiments/experiments.cc
  29. 24
      src/core/lib/experiments/experiments.h
  30. 7
      src/core/lib/experiments/experiments.yaml
  31. 3
      src/core/lib/surface/call.cc
  32. 1
      src/python/grpcio/grpc_core_dependencies.py
  33. 5
      test/core/end2end/BUILD
  34. 51
      test/core/end2end/fuzzers/client_fuzzer_corpus/5061521840340992
  35. 4
      test/core/end2end/grpc_core_end2end_test.bzl
  36. 10
      test/core/transport/chaotic_good/frame_fuzzer.cc
  37. BIN
      test/core/transport/chaotic_good/frame_fuzzer_corpus/5072496117219328
  38. 2
      tools/doxygen/Doxyfile.c++.internal
  39. 2
      tools/doxygen/Doxyfile.core.internal

@ -3590,11 +3590,13 @@ grpc_cc_library(
"//src/core:ext/filters/http/client/http_client_filter.cc",
"//src/core:ext/filters/http/http_filters_plugin.cc",
"//src/core:ext/filters/http/message_compress/compression_filter.cc",
"//src/core:ext/filters/http/message_compress/legacy_compression_filter.cc",
"//src/core:ext/filters/http/server/http_server_filter.cc",
],
hdrs = [
"//src/core:ext/filters/http/client/http_client_filter.h",
"//src/core:ext/filters/http/message_compress/compression_filter.h",
"//src/core:ext/filters/http/message_compress/legacy_compression_filter.h",
"//src/core:ext/filters/http/server/http_server_filter.h",
],
external_deps = [
@ -3623,6 +3625,7 @@ grpc_cc_library(
"//src/core:channel_fwd",
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:experiments",
"//src/core:grpc_message_size_filter",
"//src/core:latch",
"//src/core:map",

4
CMakeLists.txt generated

@ -1826,6 +1826,7 @@ add_library(grpc
src/core/ext/filters/http/client_authority_filter.cc
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/compression_filter.cc
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc
src/core/ext/filters/rbac/rbac_filter.cc
@ -2882,6 +2883,7 @@ add_library(grpc_unsecure
src/core/ext/filters/http/client_authority_filter.cc
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/compression_filter.cc
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc
src/core/ext/transport/chttp2/client/chttp2_connector.cc
@ -12501,8 +12503,8 @@ target_include_directories(frame_header_test
target_link_libraries(frame_header_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::status
absl::statusor
gpr
)

2
Makefile generated

@ -1028,6 +1028,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/rbac/rbac_filter.cc \
@ -1934,6 +1935,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/transport/chttp2/client/chttp2_connector.cc \

2
Package.swift generated

@ -249,6 +249,8 @@ let package = Package(
"src/core/ext/filters/http/http_filters_plugin.cc",
"src/core/ext/filters/http/message_compress/compression_filter.cc",
"src/core/ext/filters/http/message_compress/compression_filter.h",
"src/core/ext/filters/http/message_compress/legacy_compression_filter.cc",
"src/core/ext/filters/http/message_compress/legacy_compression_filter.h",
"src/core/ext/filters/http/server/http_server_filter.cc",
"src/core/ext/filters/http/server/http_server_filter.h",
"src/core/ext/filters/message_size/message_size_filter.cc",

@ -36,10 +36,10 @@ EXPERIMENT_ENABLES = {
"pending_queue_cap": "pending_queue_cap",
"pick_first_happy_eyeballs": "pick_first_happy_eyeballs",
"promise_based_client_call": "promise_based_client_call",
"promise_based_inproc_transport": "promise_based_inproc_transport",
"promise_based_server_call": "promise_based_server_call",
"red_max_concurrent_streams": "red_max_concurrent_streams",
"registered_method_lookup_in_transport": "registered_method_lookup_in_transport",
"promise_based_inproc_transport": "promise_based_client_call,promise_based_inproc_transport,promise_based_server_call,registered_method_lookup_in_transport",
"registered_methods_map": "registered_methods_map",
"rfc_max_concurrent_streams": "rfc_max_concurrent_streams",
"round_robin_delegate_to_pick_first": "round_robin_delegate_to_pick_first",
@ -51,6 +51,7 @@ EXPERIMENT_ENABLES = {
"trace_record_callops": "trace_record_callops",
"unconstrained_max_quota_buffer_size": "unconstrained_max_quota_buffer_size",
"v3_channel_idle_filters": "v3_channel_idle_filters",
"v3_compression_filter": "v3_compression_filter",
"work_serializer_clears_time_cache": "work_serializer_clears_time_cache",
"work_serializer_dispatch": "work_serializer_dispatch",
"write_size_policy": "write_size_policy",
@ -66,6 +67,9 @@ EXPERIMENTS = {
"bad_client_test": [
"rfc_max_concurrent_streams",
],
"compression_test": [
"v3_compression_filter",
],
"core_end2end_test": [
"promise_based_client_call",
"promise_based_server_call",
@ -145,6 +149,9 @@ EXPERIMENTS = {
"bad_client_test": [
"rfc_max_concurrent_streams",
],
"compression_test": [
"v3_compression_filter",
],
"core_end2end_test": [
"promise_based_client_call",
"promise_based_server_call",
@ -221,6 +228,9 @@ EXPERIMENTS = {
"cancel_ares_query_test": [
"event_engine_dns",
],
"compression_test": [
"v3_compression_filter",
],
"core_end2end_test": [
"event_engine_client",
"promise_based_client_call",

@ -276,6 +276,7 @@ libs:
- src/core/ext/filters/http/client/http_client_filter.h
- src/core/ext/filters/http/client_authority_filter.h
- src/core/ext/filters/http/message_compress/compression_filter.h
- src/core/ext/filters/http/message_compress/legacy_compression_filter.h
- src/core/ext/filters/http/server/http_server_filter.h
- src/core/ext/filters/message_size/message_size_filter.h
- src/core/ext/filters/rbac/rbac_filter.h
@ -1280,6 +1281,7 @@ libs:
- src/core/ext/filters/http/client_authority_filter.cc
- src/core/ext/filters/http/http_filters_plugin.cc
- src/core/ext/filters/http/message_compress/compression_filter.cc
- src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
- src/core/ext/filters/http/server/http_server_filter.cc
- src/core/ext/filters/message_size/message_size_filter.cc
- src/core/ext/filters/rbac/rbac_filter.cc
@ -2210,6 +2212,7 @@ libs:
- src/core/ext/filters/http/client/http_client_filter.h
- src/core/ext/filters/http/client_authority_filter.h
- src/core/ext/filters/http/message_compress/compression_filter.h
- src/core/ext/filters/http/message_compress/legacy_compression_filter.h
- src/core/ext/filters/http/server/http_server_filter.h
- src/core/ext/filters/message_size/message_size_filter.h
- src/core/ext/transport/chttp2/client/chttp2_connector.h
@ -2689,6 +2692,7 @@ libs:
- src/core/ext/filters/http/client_authority_filter.cc
- src/core/ext/filters/http/http_filters_plugin.cc
- src/core/ext/filters/http/message_compress/compression_filter.cc
- src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
- src/core/ext/filters/http/server/http_server_filter.cc
- src/core/ext/filters/message_size/message_size_filter.cc
- src/core/ext/transport/chttp2/client/chttp2_connector.cc
@ -9606,15 +9610,14 @@ targets:
language: c++
headers:
- src/core/ext/transport/chaotic_good/frame_header.h
- src/core/lib/gpr/useful.h
- src/core/lib/gprpp/bitset.h
src:
- src/core/ext/transport/chaotic_good/frame_header.cc
- test/core/transport/chaotic_good/frame_header_test.cc
deps:
- gtest
- absl/status:status
- absl/status:statusor
- gpr
- name: fuzzing_event_engine_test
gtest: true
build: test

1
config.m4 generated

@ -115,6 +115,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/rbac/rbac_filter.cc \

1
config.w32 generated

@ -80,6 +80,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\http\\client_authority_filter.cc " +
"src\\core\\ext\\filters\\http\\http_filters_plugin.cc " +
"src\\core\\ext\\filters\\http\\message_compress\\compression_filter.cc " +
"src\\core\\ext\\filters\\http\\message_compress\\legacy_compression_filter.cc " +
"src\\core\\ext\\filters\\http\\server\\http_server_filter.cc " +
"src\\core\\ext\\filters\\message_size\\message_size_filter.cc " +
"src\\core\\ext\\filters\\rbac\\rbac_filter.cc " +

2
gRPC-C++.podspec generated

@ -309,6 +309,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/rbac/rbac_filter.h',
@ -1562,6 +1563,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/rbac/rbac_filter.h',

3
gRPC-Core.podspec generated

@ -352,6 +352,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.cc',
@ -2346,6 +2348,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/rbac/rbac_filter.h',

2
grpc.gemspec generated

@ -255,6 +255,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/http/http_filters_plugin.cc )
s.files += %w( src/core/ext/filters/http/message_compress/compression_filter.cc )
s.files += %w( src/core/ext/filters/http/message_compress/compression_filter.h )
s.files += %w( src/core/ext/filters/http/message_compress/legacy_compression_filter.cc )
s.files += %w( src/core/ext/filters/http/message_compress/legacy_compression_filter.h )
s.files += %w( src/core/ext/filters/http/server/http_server_filter.cc )
s.files += %w( src/core/ext/filters/http/server/http_server_filter.h )
s.files += %w( src/core/ext/filters/message_size/message_size_filter.cc )

2
grpc.gyp generated

@ -348,6 +348,7 @@
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/rbac/rbac_filter.cc',
@ -1196,6 +1197,7 @@
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/transport/chttp2/client/chttp2_connector.cc',

2
package.xml generated

@ -237,6 +237,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/http/http_filters_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/compression_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/compression_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/legacy_compression_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/legacy_compression_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/server/http_server_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/server/http_server_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/message_size/message_size_filter.cc" role="src" />

@ -6214,6 +6214,7 @@ grpc_cc_library(
],
deps = [
"bitset",
"//:gpr",
"//:gpr_platform",
],
)

@ -22,10 +22,12 @@
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/ext/filters/http/message_compress/compression_filter.h"
#include "src/core/ext/filters/http/message_compress/legacy_compression_filter.h"
#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
@ -38,20 +40,38 @@ bool IsBuildingHttpLikeTransport(const ChannelArgs& args) {
} // namespace
void RegisterHttpFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter});
if (IsV3CompressionFilterEnabled()) {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter});
} else {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&LegacyClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&LegacyClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL,
&LegacyServerCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter});
}
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter)
.If(IsBuildingHttpLikeTransport)

@ -56,6 +56,11 @@
namespace grpc_core {
const NoInterceptor ServerCompressionFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ServerCompressionFilter::Call::OnFinalize;
const NoInterceptor ClientCompressionFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ClientCompressionFilter::Call::OnFinalize;
const grpc_channel_filter ClientCompressionFilter::kFilter =
MakePromiseBasedFilter<ClientCompressionFilter, FilterEndpoint::kClient,
kFilterExaminesServerInitialMetadata |
@ -77,7 +82,7 @@ absl::StatusOr<ServerCompressionFilter> ServerCompressionFilter::Create(
return ServerCompressionFilter(args);
}
CompressionFilter::CompressionFilter(const ChannelArgs& args)
ChannelCompression::ChannelCompression(const ChannelArgs& args)
: max_recv_size_(GetMaxRecvSizeFromChannelArgs(args)),
message_size_service_config_parser_index_(
MessageSizeParser::ParserIndex()),
@ -105,7 +110,7 @@ CompressionFilter::CompressionFilter(const ChannelArgs& args)
}
}
MessageHandle CompressionFilter::CompressMessage(
MessageHandle ChannelCompression::CompressMessage(
MessageHandle message, grpc_compression_algorithm algorithm) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_INFO, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d",
@ -163,7 +168,7 @@ MessageHandle CompressionFilter::CompressMessage(
return message;
}
absl::StatusOr<MessageHandle> CompressionFilter::DecompressMessage(
absl::StatusOr<MessageHandle> ChannelCompression::DecompressMessage(
MessageHandle message, DecompressArgs args) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_INFO, "DecompressMessage: len=%" PRIdPTR " max=%d alg=%d",
@ -208,7 +213,7 @@ absl::StatusOr<MessageHandle> CompressionFilter::DecompressMessage(
return std::move(message);
}
grpc_compression_algorithm CompressionFilter::HandleOutgoingMetadata(
grpc_compression_algorithm ChannelCompression::HandleOutgoingMetadata(
grpc_metadata_batch& outgoing_metadata) {
const auto algorithm = outgoing_metadata.Take(GrpcInternalEncodingRequest())
.value_or(default_compression_algorithm());
@ -221,7 +226,7 @@ grpc_compression_algorithm CompressionFilter::HandleOutgoingMetadata(
return algorithm;
}
CompressionFilter::DecompressArgs CompressionFilter::HandleIncomingMetadata(
ChannelCompression::DecompressArgs ChannelCompression::HandleIncomingMetadata(
const grpc_metadata_batch& incoming_metadata) {
// Configure max receive size.
auto max_recv_message_length = max_recv_size_;
@ -232,89 +237,59 @@ CompressionFilter::DecompressArgs CompressionFilter::HandleIncomingMetadata(
if (limits != nullptr && limits->max_recv_size().has_value() &&
(!max_recv_message_length.has_value() ||
*limits->max_recv_size() < *max_recv_message_length)) {
max_recv_message_length = *limits->max_recv_size();
max_recv_message_length = limits->max_recv_size();
}
return DecompressArgs{incoming_metadata.get(GrpcEncodingMetadata())
.value_or(GRPC_COMPRESS_NONE),
max_recv_message_length};
}
ArenaPromise<ServerMetadataHandle> ClientCompressionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto compression_algorithm =
HandleOutgoingMetadata(*call_args.client_initial_metadata);
call_args.client_to_server_messages->InterceptAndMap(
[compression_algorithm,
this](MessageHandle message) -> absl::optional<MessageHandle> {
return CompressMessage(std::move(message), compression_algorithm);
});
auto* decompress_args = GetContext<Arena>()->New<DecompressArgs>(
DecompressArgs{GRPC_COMPRESS_ALGORITHMS_COUNT, absl::nullopt});
auto* decompress_err =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
call_args.server_initial_metadata->InterceptAndMap(
[decompress_args, this](ServerMetadataHandle server_initial_metadata)
-> absl::optional<ServerMetadataHandle> {
if (server_initial_metadata == nullptr) return absl::nullopt;
*decompress_args = HandleIncomingMetadata(*server_initial_metadata);
return std::move(server_initial_metadata);
});
call_args.server_to_client_messages->InterceptAndMap(
[decompress_err, decompress_args,
this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), *decompress_args);
if (!r.ok()) {
decompress_err->Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
}
return std::move(*r);
});
// Run the next filter, and race it with getting an error from decompression.
return PrioritizedRace(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
void ClientCompressionFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ClientCompressionFilter* filter) {
compression_algorithm_ =
filter->compression_engine_.HandleOutgoingMetadata(md);
}
MessageHandle ClientCompressionFilter::Call::OnClientToServerMessage(
MessageHandle message, ClientCompressionFilter* filter) {
return filter->compression_engine_.CompressMessage(std::move(message),
compression_algorithm_);
}
void ClientCompressionFilter::Call::OnServerInitialMetadata(
ServerMetadata& md, ClientCompressionFilter* filter) {
decompress_args_ = filter->compression_engine_.HandleIncomingMetadata(md);
}
absl::StatusOr<MessageHandle>
ClientCompressionFilter::Call::OnServerToClientMessage(
MessageHandle message, ClientCompressionFilter* filter) {
return filter->compression_engine_.DecompressMessage(std::move(message),
decompress_args_);
}
void ServerCompressionFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ServerCompressionFilter* filter) {
decompress_args_ = filter->compression_engine_.HandleIncomingMetadata(md);
}
absl::StatusOr<MessageHandle>
ServerCompressionFilter::Call::OnClientToServerMessage(
MessageHandle message, ServerCompressionFilter* filter) {
return filter->compression_engine_.DecompressMessage(std::move(message),
decompress_args_);
}
void ServerCompressionFilter::Call::OnServerInitialMetadata(
ServerMetadata& md, ServerCompressionFilter* filter) {
compression_algorithm_ =
filter->compression_engine_.HandleOutgoingMetadata(md);
}
ArenaPromise<ServerMetadataHandle> ServerCompressionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto decompress_args =
HandleIncomingMetadata(*call_args.client_initial_metadata);
auto* decompress_err =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
call_args.client_to_server_messages->InterceptAndMap(
[decompress_err, decompress_args,
this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), decompress_args);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s",
Activity::current()->DebugTag().c_str(),
r.status().ToString().c_str());
}
if (!r.ok()) {
decompress_err->Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
}
return std::move(*r);
});
auto* compression_algorithm =
GetContext<Arena>()->New<grpc_compression_algorithm>();
call_args.server_initial_metadata->InterceptAndMap(
[this, compression_algorithm](ServerMetadataHandle md) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[compression] Write metadata",
Activity::current()->DebugTag().c_str());
}
// Find the compression algorithm.
*compression_algorithm = HandleOutgoingMetadata(*md);
return md;
});
call_args.server_to_client_messages->InterceptAndMap(
[compression_algorithm,
this](MessageHandle message) -> absl::optional<MessageHandle> {
return CompressMessage(std::move(message), *compression_algorithm);
});
// Run the next filter, and race it with getting an error from decompression.
return PrioritizedRace(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
MessageHandle ServerCompressionFilter::Call::OnServerToClientMessage(
MessageHandle message, ServerCompressionFilter* filter) {
return filter->compression_engine_.CompressMessage(std::move(message),
compression_algorithm_);
}
} // namespace grpc_core

@ -61,15 +61,15 @@ namespace grpc_core {
/// the aforementioned 'grpc-encoding' metadata value, data will pass through
/// uncompressed.
class CompressionFilter : public ChannelFilter {
protected:
class ChannelCompression {
public:
explicit ChannelCompression(const ChannelArgs& args);
struct DecompressArgs {
grpc_compression_algorithm algorithm;
absl::optional<uint32_t> max_recv_message_length;
};
explicit CompressionFilter(const ChannelArgs& args);
grpc_compression_algorithm default_compression_algorithm() const {
return default_compression_algorithm_;
}
@ -104,7 +104,8 @@ class CompressionFilter : public ChannelFilter {
bool enable_decompression_;
};
class ClientCompressionFilter final : public CompressionFilter {
class ClientCompressionFilter final
: public ImplementChannelFilter<ClientCompressionFilter> {
public:
static const grpc_channel_filter kFilter;
@ -112,14 +113,35 @@ class ClientCompressionFilter final : public CompressionFilter {
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& md,
ClientCompressionFilter* filter);
MessageHandle OnClientToServerMessage(MessageHandle message,
ClientCompressionFilter* filter);
void OnServerInitialMetadata(ServerMetadata& md,
ClientCompressionFilter* filter);
absl::StatusOr<MessageHandle> OnServerToClientMessage(
MessageHandle message, ClientCompressionFilter* filter);
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
private:
grpc_compression_algorithm compression_algorithm_;
ChannelCompression::DecompressArgs decompress_args_;
};
private:
using CompressionFilter::CompressionFilter;
explicit ClientCompressionFilter(const ChannelArgs& args)
: compression_engine_(args) {}
ChannelCompression compression_engine_;
};
class ServerCompressionFilter final : public CompressionFilter {
class ServerCompressionFilter final
: public ImplementChannelFilter<ServerCompressionFilter> {
public:
static const grpc_channel_filter kFilter;
@ -127,11 +149,31 @@ class ServerCompressionFilter final : public CompressionFilter {
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& md,
ServerCompressionFilter* filter);
absl::StatusOr<MessageHandle> OnClientToServerMessage(
MessageHandle message, ServerCompressionFilter* filter);
void OnServerInitialMetadata(ServerMetadata& md,
ServerCompressionFilter* filter);
MessageHandle OnServerToClientMessage(MessageHandle message,
ServerCompressionFilter* filter);
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
private:
ChannelCompression::DecompressArgs decompress_args_;
grpc_compression_algorithm compression_algorithm_;
};
private:
using CompressionFilter::CompressionFilter;
explicit ServerCompressionFilter(const ChannelArgs& args)
: compression_engine_(args) {}
ChannelCompression compression_engine_;
};
} // namespace grpc_core

@ -0,0 +1,325 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/http/message_compress/legacy_compression_filter.h"
#include <inttypes.h>
#include <functional>
#include <memory>
#include <utility>
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/types/optional.h"
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/impl/channel_arg_names.h>
#include <grpc/impl/compression_types.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/prioritized_race.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
const grpc_channel_filter LegacyClientCompressionFilter::kFilter =
MakePromiseBasedFilter<
LegacyClientCompressionFilter, FilterEndpoint::kClient,
kFilterExaminesServerInitialMetadata | kFilterExaminesInboundMessages |
kFilterExaminesOutboundMessages>("compression");
const grpc_channel_filter LegacyServerCompressionFilter::kFilter =
MakePromiseBasedFilter<
LegacyServerCompressionFilter, FilterEndpoint::kServer,
kFilterExaminesServerInitialMetadata | kFilterExaminesInboundMessages |
kFilterExaminesOutboundMessages>("compression");
absl::StatusOr<LegacyClientCompressionFilter>
LegacyClientCompressionFilter::Create(const ChannelArgs& args,
ChannelFilter::Args) {
return LegacyClientCompressionFilter(args);
}
absl::StatusOr<LegacyServerCompressionFilter>
LegacyServerCompressionFilter::Create(const ChannelArgs& args,
ChannelFilter::Args) {
return LegacyServerCompressionFilter(args);
}
LegacyCompressionFilter::LegacyCompressionFilter(const ChannelArgs& args)
: max_recv_size_(GetMaxRecvSizeFromChannelArgs(args)),
message_size_service_config_parser_index_(
MessageSizeParser::ParserIndex()),
default_compression_algorithm_(
DefaultCompressionAlgorithmFromChannelArgs(args).value_or(
GRPC_COMPRESS_NONE)),
enabled_compression_algorithms_(
CompressionAlgorithmSet::FromChannelArgs(args)),
enable_compression_(
args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION).value_or(true)),
enable_decompression_(
args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION)
.value_or(true)) {
// Make sure the default is enabled.
if (!enabled_compression_algorithms_.IsSet(default_compression_algorithm_)) {
const char* name;
if (!grpc_compression_algorithm_name(default_compression_algorithm_,
&name)) {
name = "<unknown>";
}
gpr_log(GPR_ERROR,
"default compression algorithm %s not enabled: switching to none",
name);
default_compression_algorithm_ = GRPC_COMPRESS_NONE;
}
}
MessageHandle LegacyCompressionFilter::CompressMessage(
MessageHandle message, grpc_compression_algorithm algorithm) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_INFO, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d",
message->payload()->Length(), algorithm, message->flags());
}
auto* call_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<CallTracerInterface*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer != nullptr) {
call_tracer->RecordSendMessage(*message->payload());
}
// Check if we're allowed to compress this message
// (apps might want to disable compression for certain messages to avoid
// crime/beast like vulns).
uint32_t& flags = message->mutable_flags();
if (algorithm == GRPC_COMPRESS_NONE || !enable_compression_ ||
(flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS))) {
return message;
}
// Try to compress the payload.
SliceBuffer tmp;
SliceBuffer* payload = message->payload();
bool did_compress = grpc_msg_compress(algorithm, payload->c_slice_buffer(),
tmp.c_slice_buffer());
// If we achieved compression send it as compressed, otherwise send it as (to
// avoid spending cycles on the receiver decompressing).
if (did_compress) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
const char* algo_name;
const size_t before_size = payload->Length();
const size_t after_size = tmp.Length();
const float savings_ratio = 1.0f - static_cast<float>(after_size) /
static_cast<float>(before_size);
GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name));
gpr_log(GPR_INFO,
"Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
" bytes (%.2f%% savings)",
algo_name, before_size, after_size, 100 * savings_ratio);
}
tmp.Swap(payload);
flags |= GRPC_WRITE_INTERNAL_COMPRESS;
if (call_tracer != nullptr) {
call_tracer->RecordSendCompressedMessage(*message->payload());
}
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
const char* algo_name;
GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name));
gpr_log(GPR_INFO,
"Algorithm '%s' enabled but decided not to compress. Input size: "
"%" PRIuPTR,
algo_name, payload->Length());
}
}
return message;
}
absl::StatusOr<MessageHandle> LegacyCompressionFilter::DecompressMessage(
MessageHandle message, DecompressArgs args) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_INFO, "DecompressMessage: len=%" PRIdPTR " max=%d alg=%d",
message->payload()->Length(),
args.max_recv_message_length.value_or(-1), args.algorithm);
}
auto* call_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<CallTracerInterface*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer != nullptr) {
call_tracer->RecordReceivedMessage(*message->payload());
}
// Check max message length.
if (args.max_recv_message_length.has_value() &&
message->payload()->Length() >
static_cast<size_t>(*args.max_recv_message_length)) {
return absl::ResourceExhaustedError(absl::StrFormat(
"Received message larger than max (%u vs. %d)",
message->payload()->Length(), *args.max_recv_message_length));
}
// Check if decompression is enabled (if not, we can just pass the message
// up).
if (!enable_decompression_ ||
(message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) == 0) {
return std::move(message);
}
// Try to decompress the payload.
SliceBuffer decompressed_slices;
if (grpc_msg_decompress(args.algorithm, message->payload()->c_slice_buffer(),
decompressed_slices.c_slice_buffer()) == 0) {
return absl::InternalError(
absl::StrCat("Unexpected error decompressing data for algorithm ",
CompressionAlgorithmAsString(args.algorithm)));
}
// Swap the decompressed slices into the message.
message->payload()->Swap(&decompressed_slices);
message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS;
message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED;
if (call_tracer != nullptr) {
call_tracer->RecordReceivedDecompressedMessage(*message->payload());
}
return std::move(message);
}
grpc_compression_algorithm LegacyCompressionFilter::HandleOutgoingMetadata(
grpc_metadata_batch& outgoing_metadata) {
const auto algorithm = outgoing_metadata.Take(GrpcInternalEncodingRequest())
.value_or(default_compression_algorithm());
// Convey supported compression algorithms.
outgoing_metadata.Set(GrpcAcceptEncodingMetadata(),
enabled_compression_algorithms());
if (algorithm != GRPC_COMPRESS_NONE) {
outgoing_metadata.Set(GrpcEncodingMetadata(), algorithm);
}
return algorithm;
}
LegacyCompressionFilter::DecompressArgs
LegacyCompressionFilter::HandleIncomingMetadata(
const grpc_metadata_batch& incoming_metadata) {
// Configure max receive size.
auto max_recv_message_length = max_recv_size_;
const MessageSizeParsedConfig* limits =
MessageSizeParsedConfig::GetFromCallContext(
GetContext<grpc_call_context_element>(),
message_size_service_config_parser_index_);
if (limits != nullptr && limits->max_recv_size().has_value() &&
(!max_recv_message_length.has_value() ||
*limits->max_recv_size() < *max_recv_message_length)) {
max_recv_message_length = *limits->max_recv_size();
}
return DecompressArgs{incoming_metadata.get(GrpcEncodingMetadata())
.value_or(GRPC_COMPRESS_NONE),
max_recv_message_length};
}
ArenaPromise<ServerMetadataHandle>
LegacyClientCompressionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto compression_algorithm =
HandleOutgoingMetadata(*call_args.client_initial_metadata);
call_args.client_to_server_messages->InterceptAndMap(
[compression_algorithm,
this](MessageHandle message) -> absl::optional<MessageHandle> {
return CompressMessage(std::move(message), compression_algorithm);
});
auto* decompress_args = GetContext<Arena>()->New<DecompressArgs>(
DecompressArgs{GRPC_COMPRESS_ALGORITHMS_COUNT, absl::nullopt});
auto* decompress_err =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
call_args.server_initial_metadata->InterceptAndMap(
[decompress_args, this](ServerMetadataHandle server_initial_metadata)
-> absl::optional<ServerMetadataHandle> {
if (server_initial_metadata == nullptr) return absl::nullopt;
*decompress_args = HandleIncomingMetadata(*server_initial_metadata);
return std::move(server_initial_metadata);
});
call_args.server_to_client_messages->InterceptAndMap(
[decompress_err, decompress_args,
this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), *decompress_args);
if (!r.ok()) {
decompress_err->Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
}
return std::move(*r);
});
// Run the next filter, and race it with getting an error from decompression.
return PrioritizedRace(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
}
ArenaPromise<ServerMetadataHandle>
LegacyServerCompressionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto decompress_args =
HandleIncomingMetadata(*call_args.client_initial_metadata);
auto* decompress_err =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
call_args.client_to_server_messages->InterceptAndMap(
[decompress_err, decompress_args,
this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), decompress_args);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s",
Activity::current()->DebugTag().c_str(),
r.status().ToString().c_str());
}
if (!r.ok()) {
decompress_err->Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
}
return std::move(*r);
});
auto* compression_algorithm =
GetContext<Arena>()->New<grpc_compression_algorithm>();
call_args.server_initial_metadata->InterceptAndMap(
[this, compression_algorithm](ServerMetadataHandle md) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[compression] Write metadata",
Activity::current()->DebugTag().c_str());
}
// Find the compression algorithm.
*compression_algorithm = HandleOutgoingMetadata(*md);
return md;
});
call_args.server_to_client_messages->InterceptAndMap(
[compression_algorithm,
this](MessageHandle message) -> absl::optional<MessageHandle> {
return CompressMessage(std::move(message), *compression_algorithm);
});
// Run the next filter, and race it with getting an error from decompression.
return PrioritizedRace(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
}
} // namespace grpc_core

@ -0,0 +1,139 @@
//
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H
#define GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/impl/compression_types.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
/// Compression filter for messages.
///
/// See <grpc/compression.h> for the available compression settings.
///
/// Compression settings may come from:
/// - Channel configuration, as established at channel creation time.
/// - The metadata accompanying the outgoing data to be compressed. This is
/// taken as a request only. We may choose not to honor it. The metadata key
/// is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY.
///
/// Compression can be disabled for concrete messages (for instance in order to
/// prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set
/// in the MessageHandle flags.
///
/// The attempted compression mechanism is added to the resulting initial
/// metadata under the 'grpc-encoding' key.
///
/// If compression is actually performed, the MessageHandle's flag is modified
/// to incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of
/// the aforementioned 'grpc-encoding' metadata value, data will pass through
/// uncompressed.
class LegacyCompressionFilter : public ChannelFilter {
protected:
struct DecompressArgs {
grpc_compression_algorithm algorithm;
absl::optional<uint32_t> max_recv_message_length;
};
explicit LegacyCompressionFilter(const ChannelArgs& args);
grpc_compression_algorithm default_compression_algorithm() const {
return default_compression_algorithm_;
}
CompressionAlgorithmSet enabled_compression_algorithms() const {
return enabled_compression_algorithms_;
}
grpc_compression_algorithm HandleOutgoingMetadata(
grpc_metadata_batch& outgoing_metadata);
DecompressArgs HandleIncomingMetadata(
const grpc_metadata_batch& incoming_metadata);
// Compress one message synchronously.
MessageHandle CompressMessage(MessageHandle message,
grpc_compression_algorithm algorithm) const;
// Decompress one message synchronously.
absl::StatusOr<MessageHandle> DecompressMessage(MessageHandle message,
DecompressArgs args) const;
private:
// Max receive message length, if set.
absl::optional<uint32_t> max_recv_size_;
size_t message_size_service_config_parser_index_;
// The default, channel-level, compression algorithm.
grpc_compression_algorithm default_compression_algorithm_;
// Enabled compression algorithms.
CompressionAlgorithmSet enabled_compression_algorithms_;
// Is compression enabled?
bool enable_compression_;
// Is decompression enabled?
bool enable_decompression_;
};
class LegacyClientCompressionFilter final : public LegacyCompressionFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<LegacyClientCompressionFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
using LegacyCompressionFilter::LegacyCompressionFilter;
};
class LegacyServerCompressionFilter final : public LegacyCompressionFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<LegacyServerCompressionFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
using LegacyCompressionFilter::LegacyCompressionFilter;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H

@ -16,6 +16,7 @@
#include "src/core/ext/transport/chaotic_good/client_transport.h"
#include <cstdint>
#include <memory>
#include <string>
#include <tuple>
@ -77,8 +78,7 @@ ClientTransport::ClientTransport(
control_endpoint_write_buffer_.Append(
frame->Serialize(hpack_compressor_.get()));
if (frame->message != nullptr) {
std::string message_padding(
frame->frame_header.message_padding, '0');
std::string message_padding(frame->message_padding, '0');
Slice slice(grpc_slice_from_cpp_string(message_padding));
// Append message padding to data_endpoint_buffer.
data_endpoint_write_buffer_.Append(std::move(slice));
@ -157,11 +157,9 @@ ClientTransport::ClientTransport(
// Move message into frame.
frame.message = arena_->MakePooled<Message>(
std::move(data_endpoint_read_buffer_), 0);
auto stream_id = frame.frame_header.stream_id;
{
MutexLock lock(&mu_);
return stream_map_[stream_id]->Push(ServerFrame(std::move(frame)));
}
MutexLock lock(&mu_);
const uint32_t stream_id = frame_header_->stream_id;
return stream_map_[stream_id]->Push(ServerFrame(std::move(frame)));
},
// Check if send frame to corresponding stream successfully.
[](bool ret) -> LoopCtl<absl::Status> {

@ -107,40 +107,37 @@ class ClientTransport {
return TrySeq(
TryJoin(
// Continuously send client frame with client to server messages.
ForEach(
std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true,
client_initial_metadata =
std::move(call_args.client_initial_metadata),
outgoing_frames = outgoing_frames_.MakeSender(),
this](MessageHandle result) mutable {
ClientFragmentFrame frame;
// Construct frame header (flags, header_length and
// trailer_length will be added in serialization).
uint32_t message_length = result->payload()->Length();
uint32_t message_padding = message_length % aligned_bytes;
frame.frame_header = FrameHeader{
FrameType::kFragment, {}, stream_id, 0, message_length,
message_padding, 0};
frame.message = std::move(result);
if (initial_frame) {
// Send initial frame with client intial metadata.
frame.headers = std::move(client_initial_metadata);
initial_frame = false;
}
return TrySeq(
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
// TODO(ladynana): propagate the actual error message
// from EventEngine.
return absl::UnavailableError(
"Transport closed due to endpoint write/read "
"failed.");
}
return absl::OkStatus();
});
}),
ForEach(std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true,
client_initial_metadata =
std::move(call_args.client_initial_metadata),
outgoing_frames = outgoing_frames_.MakeSender(),
this](MessageHandle result) mutable {
ClientFragmentFrame frame;
// Construct frame header (flags, header_length and
// trailer_length will be added in serialization).
uint32_t message_length = result->payload()->Length();
frame.stream_id = stream_id;
frame.message_padding = message_length % aligned_bytes;
frame.message = std::move(result);
if (initial_frame) {
// Send initial frame with client intial metadata.
frame.headers = std::move(client_initial_metadata);
initial_frame = false;
}
return TrySeq(
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
// TODO(ladynana): propagate the actual error
// message from EventEngine.
return absl::UnavailableError(
"Transport closed due to endpoint write/read "
"failed.");
}
return absl::OkStatus();
});
}),
// Continuously receive server frames from endpoints and save
// results to call_args.
Loop([server_initial_metadata = call_args.server_initial_metadata,

@ -50,12 +50,13 @@ const NoDestruct<Slice> kZeroSlice{[] {
class FrameSerializer {
public:
explicit FrameSerializer(FrameHeader header) : header_(header) {
explicit FrameSerializer(FrameType frame_type, uint32_t stream_id,
uint32_t message_padding) {
output_.AppendIndexed(kZeroSlice->Copy());
// Initialize header flags, header_length, trailer_length to 0.
header_.type = frame_type;
header_.stream_id = stream_id;
header_.message_padding = message_padding;
header_.flags.SetAll(false);
header_.header_length = 0;
header_.trailer_length = 0;
}
// If called, must be called before AddTrailers, Finish.
SliceBuffer& AddHeaders() {
@ -173,11 +174,12 @@ absl::Status SettingsFrame::Deserialize(HPackParser*, const FrameHeader& header,
}
SliceBuffer SettingsFrame::Serialize(HPackCompressor*) const {
FrameSerializer serializer(
FrameHeader{FrameType::kSettings, {}, 0, 0, 0, 0, 0});
FrameSerializer serializer(FrameType::kSettings, 0, 0);
return serializer.Finish();
}
std::string SettingsFrame::ToString() const { return "SettingsFrame{}"; }
absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
const FrameHeader& header,
absl::BitGenRef bitsrc,
@ -185,7 +187,8 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
if (header.stream_id == 0) {
return absl::InvalidArgumentError("Expected non-zero stream id");
}
frame_header = header;
stream_id = header.stream_id;
message_padding = header.message_padding;
if (header.type != FrameType::kFragment) {
return absl::InvalidArgumentError("Expected fragment frame");
}
@ -197,6 +200,9 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
if (r.value() != nullptr) {
headers = std::move(r.value());
}
} else if (header.header_length != 0) {
return absl::InvalidArgumentError(absl::StrCat(
"Unexpected non-zero header length", header.header_length));
}
if (header.flags.is_set(1)) {
if (header.trailer_length != 0) {
@ -210,8 +216,8 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
}
SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const {
GPR_ASSERT(frame_header.stream_id != 0);
FrameSerializer serializer(frame_header);
GPR_ASSERT(stream_id != 0);
FrameSerializer serializer(FrameType::kFragment, stream_id, message_padding);
if (headers.get() != nullptr) {
encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders());
}
@ -221,6 +227,16 @@ SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const {
return serializer.Finish();
}
std::string ClientFragmentFrame::ToString() const {
return absl::StrCat(
"ClientFragmentFrame{stream_id=", stream_id, ", headers=",
headers.get() != nullptr ? headers->DebugString().c_str() : "nullptr",
", message=",
message.get() != nullptr ? message->DebugString().c_str() : "nullptr",
", message_padding=", message_padding, ", end_of_stream=", end_of_stream,
"}");
}
absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
const FrameHeader& header,
absl::BitGenRef bitsrc,
@ -228,7 +244,8 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
if (header.stream_id == 0) {
return absl::InvalidArgumentError("Expected non-zero stream id");
}
frame_header = header;
stream_id = header.stream_id;
message_padding = header.message_padding;
FrameDeserializer deserializer(header, slice_buffer);
if (header.flags.is_set(0)) {
auto r =
@ -238,6 +255,9 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
if (r.value() != nullptr) {
headers = std::move(r.value());
}
} else if (header.header_length != 0) {
return absl::InvalidArgumentError(absl::StrCat(
"Unexpected non-zero header length", header.header_length));
}
if (header.flags.is_set(1)) {
auto r =
@ -247,13 +267,16 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
if (r.value() != nullptr) {
trailers = std::move(r.value());
}
} else if (header.trailer_length != 0) {
return absl::InvalidArgumentError(absl::StrCat(
"Unexpected non-zero trailer length", header.trailer_length));
}
return deserializer.Finish();
}
SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const {
GPR_ASSERT(frame_header.stream_id != 0);
FrameSerializer serializer(frame_header);
GPR_ASSERT(stream_id != 0);
FrameSerializer serializer(FrameType::kFragment, stream_id, message_padding);
if (headers.get() != nullptr) {
encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders());
}
@ -263,6 +286,17 @@ SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const {
return serializer.Finish();
}
std::string ServerFragmentFrame::ToString() const {
return absl::StrCat(
"ServerFragmentFrame{stream_id=", stream_id, ", headers=",
headers.get() != nullptr ? headers->DebugString().c_str() : "nullptr",
", message=",
message.get() != nullptr ? message->DebugString().c_str() : "nullptr",
", message_padding=", message_padding, ", trailers=",
trailers.get() != nullptr ? trailers->DebugString().c_str() : "nullptr",
"}");
}
absl::Status CancelFrame::Deserialize(HPackParser*, const FrameHeader& header,
absl::BitGenRef,
SliceBuffer& slice_buffer) {
@ -282,10 +316,13 @@ absl::Status CancelFrame::Deserialize(HPackParser*, const FrameHeader& header,
SliceBuffer CancelFrame::Serialize(HPackCompressor*) const {
GPR_ASSERT(stream_id != 0);
FrameSerializer serializer(
FrameHeader{FrameType::kCancel, {}, stream_id, 0, 0, 0, 0});
FrameSerializer serializer(FrameType::kCancel, stream_id, 0);
return serializer.Finish();
}
std::string CancelFrame::ToString() const {
return absl::StrCat("CancelFrame{stream_id=", stream_id, "}");
}
} // namespace chaotic_good
} // namespace grpc_core

@ -43,6 +43,7 @@ class FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) = 0;
virtual SliceBuffer Serialize(HPackCompressor* encoder) const = 0;
virtual std::string ToString() const = 0;
protected:
static bool EqVal(const Message& a, const Message& b) {
@ -67,6 +68,7 @@ struct SettingsFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;
bool operator==(const SettingsFrame&) const { return true; }
};
@ -76,15 +78,16 @@ struct ClientFragmentFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;
FrameHeader frame_header;
uint32_t stream_id;
ClientMetadataHandle headers;
MessageHandle message;
uint32_t message_padding;
bool end_of_stream = false;
bool operator==(const ClientFragmentFrame& other) const {
return frame_header.stream_id == other.frame_header.stream_id &&
EqHdl(headers, other.headers) &&
return stream_id == other.stream_id && EqHdl(headers, other.headers) &&
end_of_stream == other.end_of_stream;
}
};
@ -94,15 +97,17 @@ struct ServerFragmentFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;
FrameHeader frame_header;
uint32_t stream_id;
ServerMetadataHandle headers;
MessageHandle message;
uint32_t message_padding;
ServerMetadataHandle trailers;
bool operator==(const ServerFragmentFrame& other) const {
return frame_header.stream_id == other.frame_header.stream_id &&
EqHdl(headers, other.headers) && EqHdl(trailers, other.trailers);
return stream_id == other.stream_id && EqHdl(headers, other.headers) &&
EqHdl(trailers, other.trailers);
}
};
@ -111,6 +116,7 @@ struct CancelFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;
uint32_t stream_id;

@ -20,6 +20,8 @@
#include "absl/status/status.h"
#include <grpc/support/log.h>
namespace grpc_core {
namespace chaotic_good {
@ -43,6 +45,8 @@ uint32_t ReadLittleEndianUint32(const uint8_t* data) {
void FrameHeader::Serialize(uint8_t* data) const {
WriteLittleEndianUint32(
static_cast<uint32_t>(type) | (flags.ToInt<uint32_t>() << 8), data);
if (flags.is_set(0)) GPR_ASSERT(header_length > 0);
if (flags.is_set(1)) GPR_ASSERT(trailer_length > 0);
WriteLittleEndianUint32(stream_id, data + 4);
WriteLittleEndianUint32(header_length, data + 8);
WriteLittleEndianUint32(message_length, data + 12);
@ -61,13 +65,15 @@ absl::StatusOr<FrameHeader> FrameHeader::Parse(const uint8_t* data) {
header.stream_id = ReadLittleEndianUint32(data + 4);
header.header_length = ReadLittleEndianUint32(data + 8);
if (header.flags.is_set(0) && header.header_length <= 0) {
return absl::InvalidArgumentError("Invalid header length");
return absl::InvalidArgumentError(
absl::StrCat("Invalid header length: ", header.header_length));
}
header.message_length = ReadLittleEndianUint32(data + 12);
header.message_padding = ReadLittleEndianUint32(data + 16);
header.trailer_length = ReadLittleEndianUint32(data + 20);
if (header.flags.is_set(1) && header.trailer_length <= 0) {
return absl::InvalidArgumentError("Invalid trailer length");
return absl::InvalidArgumentError(
absl::StrCat("Invalid trailer length", header.trailer_length));
}
return header;
}
@ -79,5 +85,13 @@ uint32_t FrameHeader::GetFrameLength() const {
return frame_length;
}
std::string FrameHeader::ToString() const {
return absl::StrFormat(
"[type=0x%02x, flags=0x%02x, stream_id=%d, header_length=%d, "
"message_length=%d, message_padding=%d, trailer_length=%d]",
static_cast<uint8_t>(type), flags.ToInt<uint8_t>(), stream_id,
header_length, message_length, message_padding, trailer_length);
}
} // namespace chaotic_good
} // namespace grpc_core

@ -35,13 +35,13 @@ enum class FrameType : uint8_t {
};
struct FrameHeader {
FrameType type;
FrameType type = FrameType::kCancel;
BitSet<2> flags;
uint32_t stream_id;
uint32_t header_length;
uint32_t message_length;
uint32_t message_padding;
uint32_t trailer_length;
uint32_t stream_id = 0;
uint32_t header_length = 0;
uint32_t message_length = 0;
uint32_t message_padding = 0;
uint32_t trailer_length = 0;
// Parses a frame header from a buffer of 24 bytes. All 24 bytes are consumed.
static absl::StatusOr<FrameHeader> Parse(const uint8_t* data);
@ -49,6 +49,8 @@ struct FrameHeader {
void Serialize(uint8_t* data) const;
// Compute frame sizes from the header.
uint32_t GetFrameLength() const;
// Report contents as a string
std::string ToString() const;
bool operator==(const FrameHeader& h) const {
return type == h.type && flags == h.flags && stream_id == h.stream_id &&

@ -411,6 +411,8 @@ auto ConnectedChannelStream::RecvMessages(
}
if (cancel_on_error && !status.ok()) {
incoming_messages.CloseWithError();
} else {
incoming_messages.Close();
}
return Immediate(LoopCtl<absl::Status>(status.status()));
};

@ -139,11 +139,21 @@ inline constexpr bool HasAsyncErrorInterceptor(const NoInterceptor*) {
return false;
}
template <typename T, typename A0, typename... As>
inline constexpr bool HasAsyncErrorInterceptor(A0 (T::*)(A0, As...)) {
return false;
}
template <typename T, typename... A>
inline constexpr bool HasAsyncErrorInterceptor(absl::Status (T::*)(A...)) {
return true;
}
template <typename R, typename T, typename... A>
inline constexpr bool HasAsyncErrorInterceptor(absl::StatusOr<R> (T::*)(A...)) {
return true;
}
template <typename T, typename... A>
inline constexpr bool HasAsyncErrorInterceptor(
ServerMetadataHandle (T::*)(A...)) {
@ -404,6 +414,34 @@ inline void InterceptClientToServerMessage(
});
}
template <typename Derived>
inline void InterceptClientToServerMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_args.client_to_server_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
return call_data->call.OnClientToServerMessage(std::move(msg),
call_data->channel);
});
}
template <typename Derived>
inline void InterceptClientToServerMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_args.client_to_server_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call_data->call.OnClientToServerMessage(std::move(msg),
call_data->channel);
if (r.ok()) return std::move(*r);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
});
}
inline void InterceptClientToServerMessage(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
@ -412,7 +450,7 @@ inline void InterceptClientToServerMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg);
if (return_md == nullptr) return std::move(msg);
@ -426,7 +464,7 @@ inline void InterceptClientToServerMessage(
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg, channel);
@ -435,6 +473,33 @@ inline void InterceptClientToServerMessage(
});
}
template <typename Derived>
inline void InterceptClientToServerMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, channel](MessageHandle msg) {
return call->OnClientToServerMessage(std::move(msg), channel);
});
}
template <typename Derived>
inline void InterceptClientToServerMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call->OnClientToServerMessage(std::move(msg), channel);
if (r.ok()) return std::move(*r);
return call_spine->Cancel(ServerMetadataFromStatus(r.status()));
});
}
inline void InterceptClientInitialMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
@ -526,6 +591,36 @@ inline void InterceptServerInitialMetadata(
});
}
template <typename Derived>
inline void InterceptServerInitialMetadata(
void (Derived::Call::*fn)(ServerMetadata&, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_args.server_initial_metadata->InterceptAndMap(
[call_data](ServerMetadataHandle md) {
call_data->call.OnServerInitialMetadata(*md, call_data->channel);
return md;
});
}
template <typename Derived>
inline void InterceptServerInitialMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_args.server_initial_metadata->InterceptAndMap(
[call_data](
ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status =
call_data->call.OnServerInitialMetadata(*md, call_data->channel);
if (!status.ok() && !call_data->error_latch.is_set()) {
call_data->error_latch.Set(ServerMetadataFromStatus(status));
return absl::nullopt;
}
return std::move(md);
});
}
inline void InterceptServerInitialMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
@ -555,6 +650,34 @@ inline void InterceptServerInitialMetadata(
});
}
template <typename Derived>
inline void InterceptServerInitialMetadata(
void (Derived::Call::*fn)(ServerMetadata&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call, channel](ServerMetadataHandle md) {
call->OnServerInitialMetadata(*md, channel);
return md;
});
}
template <typename Derived>
inline void InterceptServerInitialMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call, call_spine, channel](
ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status = call->OnServerInitialMetadata(*md, channel);
if (status.ok()) return std::move(md);
return call_spine->Cancel(ServerMetadataFromStatus(status));
});
}
inline void InterceptServerToClientMessage(const NoInterceptor*, void*,
const CallArgs&) {}
@ -589,6 +712,34 @@ inline void InterceptServerToClientMessage(
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_args.server_to_client_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
return call_data->call.OnServerToClientMessage(std::move(msg),
call_data->channel);
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_args.server_to_client_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call_data->call.OnServerToClientMessage(std::move(msg),
call_data->channel);
if (r.ok()) return std::move(*r);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
});
}
inline void InterceptServerToClientMessage(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
@ -620,6 +771,33 @@ inline void InterceptServerToClientMessage(
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, channel](MessageHandle msg) {
return call->OnServerToClientMessage(std::move(msg), channel);
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call->OnServerToClientMessage(std::move(msg), channel);
if (r.ok()) return std::move(*r);
return call_spine->Cancel(ServerMetadataFromStatus(r.status()));
});
}
inline void InterceptServerTrailingMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}

@ -93,9 +93,6 @@ const char* const description_promise_based_client_call =
"If set, use the new gRPC promise based call code when it's appropriate "
"(ie when all filters in a stack are promise based)";
const char* const additional_constraints_promise_based_client_call = "{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const char* const description_promise_based_server_call =
"If set, use the new gRPC promise based call code when it's appropriate "
"(ie when all filters in a stack are promise based)";
@ -108,6 +105,14 @@ const char* const description_registered_method_lookup_in_transport =
"Change registered method's lookup point to transport";
const char* const additional_constraints_registered_method_lookup_in_transport =
"{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const uint8_t required_experiments_promise_based_inproc_transport[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall),
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedServerCall),
static_cast<uint8_t>(
grpc_core::kExperimentIdRegisteredMethodLookupInTransport)};
const char* const description_registered_methods_map =
"Use absl::flat_hash_map for registered methods.";
const char* const additional_constraints_registered_methods_map = "{}";
@ -150,6 +155,9 @@ const char* const additional_constraints_unconstrained_max_quota_buffer_size =
const char* const description_v3_channel_idle_filters =
"Use the v3 filter API version of the idle filters.";
const char* const additional_constraints_v3_channel_idle_filters = "{}";
const char* const description_v3_compression_filter =
"Use the compression filter utilizing the v3 filter api";
const char* const additional_constraints_v3_compression_filter = "{}";
const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache =
@ -222,10 +230,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_pick_first_happy_eyeballs, nullptr, 0, true, true},
{"promise_based_client_call", description_promise_based_client_call,
additional_constraints_promise_based_client_call, nullptr, 0, false, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport, nullptr, 0, false,
false},
{"promise_based_server_call", description_promise_based_server_call,
additional_constraints_promise_based_server_call, nullptr, 0, false, true},
{"red_max_concurrent_streams", description_red_max_concurrent_streams,
@ -235,6 +239,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_registered_method_lookup_in_transport,
additional_constraints_registered_method_lookup_in_transport, nullptr, 0,
true, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport,
required_experiments_promise_based_inproc_transport, 3, false, false},
{"registered_methods_map", description_registered_methods_map,
additional_constraints_registered_methods_map, nullptr, 0, false, true},
{"rfc_max_concurrent_streams", description_rfc_max_concurrent_streams,
@ -264,6 +272,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
false, true},
{"v3_channel_idle_filters", description_v3_channel_idle_filters,
additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true},
{"v3_compression_filter", description_v3_compression_filter,
additional_constraints_v3_compression_filter, nullptr, 0, false, true},
{"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true,
@ -352,9 +362,6 @@ const char* const description_promise_based_client_call =
"If set, use the new gRPC promise based call code when it's appropriate "
"(ie when all filters in a stack are promise based)";
const char* const additional_constraints_promise_based_client_call = "{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const char* const description_promise_based_server_call =
"If set, use the new gRPC promise based call code when it's appropriate "
"(ie when all filters in a stack are promise based)";
@ -367,6 +374,14 @@ const char* const description_registered_method_lookup_in_transport =
"Change registered method's lookup point to transport";
const char* const additional_constraints_registered_method_lookup_in_transport =
"{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const uint8_t required_experiments_promise_based_inproc_transport[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall),
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedServerCall),
static_cast<uint8_t>(
grpc_core::kExperimentIdRegisteredMethodLookupInTransport)};
const char* const description_registered_methods_map =
"Use absl::flat_hash_map for registered methods.";
const char* const additional_constraints_registered_methods_map = "{}";
@ -409,6 +424,9 @@ const char* const additional_constraints_unconstrained_max_quota_buffer_size =
const char* const description_v3_channel_idle_filters =
"Use the v3 filter API version of the idle filters.";
const char* const additional_constraints_v3_channel_idle_filters = "{}";
const char* const description_v3_compression_filter =
"Use the compression filter utilizing the v3 filter api";
const char* const additional_constraints_v3_compression_filter = "{}";
const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache =
@ -481,10 +499,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_pick_first_happy_eyeballs, nullptr, 0, true, true},
{"promise_based_client_call", description_promise_based_client_call,
additional_constraints_promise_based_client_call, nullptr, 0, false, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport, nullptr, 0, false,
false},
{"promise_based_server_call", description_promise_based_server_call,
additional_constraints_promise_based_server_call, nullptr, 0, false, true},
{"red_max_concurrent_streams", description_red_max_concurrent_streams,
@ -494,6 +508,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_registered_method_lookup_in_transport,
additional_constraints_registered_method_lookup_in_transport, nullptr, 0,
true, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport,
required_experiments_promise_based_inproc_transport, 3, false, false},
{"registered_methods_map", description_registered_methods_map,
additional_constraints_registered_methods_map, nullptr, 0, false, true},
{"rfc_max_concurrent_streams", description_rfc_max_concurrent_streams,
@ -523,6 +541,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
false, true},
{"v3_channel_idle_filters", description_v3_channel_idle_filters,
additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true},
{"v3_compression_filter", description_v3_compression_filter,
additional_constraints_v3_compression_filter, nullptr, 0, false, true},
{"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true,
@ -611,9 +631,6 @@ const char* const description_promise_based_client_call =
"If set, use the new gRPC promise based call code when it's appropriate "
"(ie when all filters in a stack are promise based)";
const char* const additional_constraints_promise_based_client_call = "{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const char* const description_promise_based_server_call =
"If set, use the new gRPC promise based call code when it's appropriate "
"(ie when all filters in a stack are promise based)";
@ -626,6 +643,14 @@ const char* const description_registered_method_lookup_in_transport =
"Change registered method's lookup point to transport";
const char* const additional_constraints_registered_method_lookup_in_transport =
"{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const uint8_t required_experiments_promise_based_inproc_transport[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall),
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedServerCall),
static_cast<uint8_t>(
grpc_core::kExperimentIdRegisteredMethodLookupInTransport)};
const char* const description_registered_methods_map =
"Use absl::flat_hash_map for registered methods.";
const char* const additional_constraints_registered_methods_map = "{}";
@ -668,6 +693,9 @@ const char* const additional_constraints_unconstrained_max_quota_buffer_size =
const char* const description_v3_channel_idle_filters =
"Use the v3 filter API version of the idle filters.";
const char* const additional_constraints_v3_channel_idle_filters = "{}";
const char* const description_v3_compression_filter =
"Use the compression filter utilizing the v3 filter api";
const char* const additional_constraints_v3_compression_filter = "{}";
const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache =
@ -740,10 +768,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_pick_first_happy_eyeballs, nullptr, 0, true, true},
{"promise_based_client_call", description_promise_based_client_call,
additional_constraints_promise_based_client_call, nullptr, 0, false, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport, nullptr, 0, false,
false},
{"promise_based_server_call", description_promise_based_server_call,
additional_constraints_promise_based_server_call, nullptr, 0, false, true},
{"red_max_concurrent_streams", description_red_max_concurrent_streams,
@ -753,6 +777,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_registered_method_lookup_in_transport,
additional_constraints_registered_method_lookup_in_transport, nullptr, 0,
true, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport,
required_experiments_promise_based_inproc_transport, 3, false, false},
{"registered_methods_map", description_registered_methods_map,
additional_constraints_registered_methods_map, nullptr, 0, false, true},
{"rfc_max_concurrent_streams", description_rfc_max_concurrent_streams,
@ -782,6 +810,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
false, true},
{"v3_channel_idle_filters", description_v3_channel_idle_filters,
additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true},
{"v3_compression_filter", description_v3_compression_filter,
additional_constraints_v3_compression_filter, nullptr, 0, false, true},
{"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true,

@ -91,11 +91,11 @@ inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS
inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
inline bool IsPromiseBasedClientCallEnabled() { return false; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsPromiseBasedServerCallEnabled() { return false; }
inline bool IsRedMaxConcurrentStreamsEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT
inline bool IsRegisteredMethodLookupInTransportEnabled() { return true; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRegisteredMethodsMapEnabled() { return false; }
inline bool IsRfcMaxConcurrentStreamsEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
@ -108,6 +108,7 @@ inline bool IsTcpRcvLowatEnabled() { return false; }
inline bool IsTraceRecordCallopsEnabled() { return false; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
inline bool IsV3ChannelIdleFiltersEnabled() { return false; }
inline bool IsV3CompressionFilterEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
@ -154,11 +155,11 @@ inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS
inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
inline bool IsPromiseBasedClientCallEnabled() { return false; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsPromiseBasedServerCallEnabled() { return false; }
inline bool IsRedMaxConcurrentStreamsEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT
inline bool IsRegisteredMethodLookupInTransportEnabled() { return true; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRegisteredMethodsMapEnabled() { return false; }
inline bool IsRfcMaxConcurrentStreamsEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
@ -171,6 +172,7 @@ inline bool IsTcpRcvLowatEnabled() { return false; }
inline bool IsTraceRecordCallopsEnabled() { return false; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
inline bool IsV3ChannelIdleFiltersEnabled() { return false; }
inline bool IsV3CompressionFilterEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
@ -217,11 +219,11 @@ inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS
inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
inline bool IsPromiseBasedClientCallEnabled() { return false; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsPromiseBasedServerCallEnabled() { return false; }
inline bool IsRedMaxConcurrentStreamsEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT
inline bool IsRegisteredMethodLookupInTransportEnabled() { return true; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRegisteredMethodsMapEnabled() { return false; }
inline bool IsRfcMaxConcurrentStreamsEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
@ -234,6 +236,7 @@ inline bool IsTcpRcvLowatEnabled() { return false; }
inline bool IsTraceRecordCallopsEnabled() { return false; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
inline bool IsV3ChannelIdleFiltersEnabled() { return false; }
inline bool IsV3CompressionFilterEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
@ -266,10 +269,10 @@ enum ExperimentIds {
kExperimentIdPendingQueueCap,
kExperimentIdPickFirstHappyEyeballs,
kExperimentIdPromiseBasedClientCall,
kExperimentIdPromiseBasedInprocTransport,
kExperimentIdPromiseBasedServerCall,
kExperimentIdRedMaxConcurrentStreams,
kExperimentIdRegisteredMethodLookupInTransport,
kExperimentIdPromiseBasedInprocTransport,
kExperimentIdRegisteredMethodsMap,
kExperimentIdRfcMaxConcurrentStreams,
kExperimentIdRoundRobinDelegateToPickFirst,
@ -281,6 +284,7 @@ enum ExperimentIds {
kExperimentIdTraceRecordCallops,
kExperimentIdUnconstrainedMaxQuotaBufferSize,
kExperimentIdV3ChannelIdleFilters,
kExperimentIdV3CompressionFilter,
kExperimentIdWorkSerializerClearsTimeCache,
kExperimentIdWorkSerializerDispatch,
kExperimentIdWriteSizePolicy,
@ -364,10 +368,6 @@ inline bool IsPickFirstHappyEyeballsEnabled() {
inline bool IsPromiseBasedClientCallEnabled() {
return IsExperimentEnabled(kExperimentIdPromiseBasedClientCall);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_INPROC_TRANSPORT
inline bool IsPromiseBasedInprocTransportEnabled() {
return IsExperimentEnabled(kExperimentIdPromiseBasedInprocTransport);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
inline bool IsPromiseBasedServerCallEnabled() {
return IsExperimentEnabled(kExperimentIdPromiseBasedServerCall);
@ -380,6 +380,10 @@ inline bool IsRedMaxConcurrentStreamsEnabled() {
inline bool IsRegisteredMethodLookupInTransportEnabled() {
return IsExperimentEnabled(kExperimentIdRegisteredMethodLookupInTransport);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_INPROC_TRANSPORT
inline bool IsPromiseBasedInprocTransportEnabled() {
return IsExperimentEnabled(kExperimentIdPromiseBasedInprocTransport);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHODS_MAP
inline bool IsRegisteredMethodsMapEnabled() {
return IsExperimentEnabled(kExperimentIdRegisteredMethodsMap);
@ -424,6 +428,10 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() {
inline bool IsV3ChannelIdleFiltersEnabled() {
return IsExperimentEnabled(kExperimentIdV3ChannelIdleFilters);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_V3_COMPRESSION_FILTER
inline bool IsV3CompressionFilterEnabled() {
return IsExperimentEnabled(kExperimentIdV3CompressionFilter);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() {
return IsExperimentEnabled(kExperimentIdWorkSerializerClearsTimeCache);

@ -169,6 +169,7 @@
owner: ctiller@google.com
test_tags: []
allow_in_fuzzing_config: false # experiment currently crashes if enabled
requires: [promise_based_client_call, promise_based_server_call, registered_method_lookup_in_transport]
- name: promise_based_server_call
description:
If set, use the new gRPC promise based call code when it's appropriate
@ -258,6 +259,12 @@
expiry: 2024/04/04
owner: ctiller@google.com
test_tags: []
- name: v3_compression_filter
description:
Use the compression filter utilizing the v3 filter api
expiry: 2024/04/04
owner: ctiller@google.com
test_tags: ["compression_test"]
- name: work_serializer_clears_time_cache
description:
Have the work serializer clear the time cache when it dispatches work.

@ -2978,7 +2978,8 @@ void ClientPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
StartRecvMessage(
op, completion,
[this]() {
return server_initial_metadata_.receiver.AwaitClosed();
return Race(server_initial_metadata_.receiver.AwaitClosed(),
server_to_client_messages_.receiver.AwaitClosed());
},
&server_to_client_messages_.receiver, false, spawner);
break;

@ -89,6 +89,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/rbac/rbac_filter.cc',

@ -286,7 +286,10 @@ grpc_core_end2end_test(name = "channelz")
grpc_core_end2end_test(name = "client_streaming")
grpc_core_end2end_test(name = "compressed_payload")
grpc_core_end2end_test(
name = "compressed_payload",
tags = ["compression_test"],
)
grpc_core_end2end_test(name = "connectivity")

@ -0,0 +1,51 @@
api_actions {
create_call {
propagation_mask: 6
method {
value: "~"
}
timeout: 34816
}
}
api_actions {
queue_batch {
operations {
send_initial_metadata {
}
}
operations {
send_message {
}
}
operations {
receive_message {
}
}
}
}
api_actions {
create_server {
}
}
api_actions {
disable_tracer: "("
}
api_actions {
destroy_call {
}
}
api_actions {
poll_cq {
}
}
api_actions {
}
api_actions {
shutdown_server {
}
}
config_vars {
verbosity: "\364\204\204\204\364\204\204\204\364\204\204\204\364\204\204\204\364\204\204\204\364\204\204\204\364\204\204\204\364\204\204\204\004"
stacktrace_minloglevel: "yyyyyyyyyyyyy&yyyyyyy\177\014"
experiments: 1099511626496
}

@ -25,7 +25,7 @@ END2END_TEST_DATA = [
"//src/core/tsi/test_creds:server1.pem",
]
def grpc_core_end2end_test(name, shard_count = 10):
def grpc_core_end2end_test(name, shard_count = 10, tags = []):
if len(name) > 60:
fail("test name %s too long" % name)
@ -45,7 +45,7 @@ def grpc_core_end2end_test(name, shard_count = 10):
"absl/types:optional",
"gtest",
],
tags = ["core_end2end_test"],
tags = ["core_end2end_test"] + tags,
deps = [
"end2end_test_main",
"cq_verifier",

@ -56,7 +56,13 @@ void AssertRoundTrips(const T& input, FrameType expected_frame_type) {
uint8_t header_bytes[24];
serialized.MoveFirstNBytesIntoBuffer(24, header_bytes);
auto header = FrameHeader::Parse(header_bytes);
GPR_ASSERT(header.ok());
if (!header.ok()) {
if (!squelch) {
gpr_log(GPR_ERROR, "Failed to parse header: %s",
header.status().ToString().c_str());
}
Crash("Failed to parse header");
}
GPR_ASSERT(header->type == expected_frame_type);
T output;
HPackParser hpack_parser;
@ -79,6 +85,7 @@ void FinishParseAndChecks(const FrameHeader& header, const uint8_t* data,
auto deser = parsed.Deserialize(&hpack_parser, header,
absl::BitGenRef(bitgen), serialized);
if (!deser.ok()) return;
gpr_log(GPR_INFO, "Read frame: %s", parsed.ToString().c_str());
AssertRoundTrips(parsed, header.type);
}
@ -90,6 +97,7 @@ int Run(const uint8_t* data, size_t size) {
if (size < 24) return 0;
auto r = FrameHeader::Parse(data);
if (!r.ok()) return 0;
gpr_log(GPR_INFO, "Read frame header: %s", r->ToString().c_str());
size -= 24;
data += 24;
MemoryAllocator memory_allocator = MemoryAllocator(

@ -1215,6 +1215,8 @@ src/core/ext/filters/http/client_authority_filter.h \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/compression_filter.h \
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \
src/core/ext/filters/http/message_compress/legacy_compression_filter.h \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/http/server/http_server_filter.h \
src/core/ext/filters/message_size/message_size_filter.cc \

@ -1024,6 +1024,8 @@ src/core/ext/filters/http/client_authority_filter.h \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/compression_filter.h \
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \
src/core/ext/filters/http/message_compress/legacy_compression_filter.h \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/http/server/http_server_filter.h \
src/core/ext/filters/message_size/message_size_filter.cc \

Loading…
Cancel
Save