got the new filter impls compiling

pull/31204/head
Craig Tiller 3 years ago
parent 066fb7e40e
commit d0021e92f4
  1. 5
      src/core/ext/filters/http/message_compress/message_compress_filter.cc
  2. 19
      src/core/ext/filters/http/message_compress/message_decompress_filter.cc
  3. 10
      src/core/ext/filters/http/message_compress/message_decompress_filter.h
  4. 12
      src/core/lib/gprpp/bitset.h
  5. 10
      test/core/compression/BUILD
  6. 42
      test/core/compression/args_utils.cc
  7. 31
      test/core/compression/args_utils.h
  8. 65
      test/core/compression/compression_test.cc
  9. 37
      test/core/end2end/fixtures/h2_compress.cc
  10. 2
      test/core/end2end/generate_tests.bzl
  11. 78
      test/core/end2end/tests/compressed_payload.cc
  12. 8
      test/cpp/microbenchmarks/bm_call_create.cc

@ -58,6 +58,11 @@ const grpc_channel_filter MessageCompressFilter::kServerFilter =
MakePromiseBasedFilter<MessageCompressFilter, FilterEndpoint::kServer>(
"message_compress");
absl::StatusOr<MessageCompressFilter> MessageCompressFilter::Create(
const ChannelArgs& args, ChannelFilter::Args) {
return MessageCompressFilter(args);
}
MessageCompressFilter::MessageCompressFilter(const ChannelArgs& args)
: default_compression_algorithm_(
DefaultCompressionAlgorithmFromChannelArgs(args).value_or(

@ -52,6 +52,25 @@
namespace grpc_core {
const grpc_channel_filter ClientMessageDecompressFilter::kFilter =
MakePromiseBasedFilter<ClientMessageDecompressFilter,
FilterEndpoint::kClient>("message_decompress");
const grpc_channel_filter ServerMessageDecompressFilter::kFilter =
MakePromiseBasedFilter<ServerMessageDecompressFilter,
FilterEndpoint::kServer>("message_decompress");
absl::StatusOr<ClientMessageDecompressFilter>
ClientMessageDecompressFilter::Create(const ChannelArgs& args,
ChannelFilter::Args) {
return ClientMessageDecompressFilter(args);
}
absl::StatusOr<ServerMessageDecompressFilter>
ServerMessageDecompressFilter::Create(const ChannelArgs& args,
ChannelFilter::Args) {
return ServerMessageDecompressFilter(args);
}
MessageDecompressFilter::MessageDecompressFilter(const ChannelArgs& args)
: max_recv_size_(GetMaxRecvSizeFromChannelArgs(args)),
message_size_service_config_parser_index_(

@ -46,24 +46,30 @@ class ClientMessageDecompressFilter final : public MessageDecompressFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<MessageDecompressFilter> Create(
static absl::StatusOr<ClientMessageDecompressFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
using MessageDecompressFilter::MessageDecompressFilter;
};
class ServerMessageDecompressFilter final : public MessageDecompressFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<MessageDecompressFilter> Create(
static absl::StatusOr<ServerMessageDecompressFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
using MessageDecompressFilter::MessageDecompressFilter;
};
} // namespace grpc_core

@ -159,6 +159,18 @@ class BitSet {
return result;
}
BitSet& Set(int i, bool value) {
set(i, value);
return *this;
}
BitSet& SetAll(bool value) {
for (size_t i = 0; i < kTotalBits; i++) {
set(i, value);
}
return *this;
}
private:
// Given a bit index, return which unit it's stored in.
static constexpr size_t unit_for(size_t bit) { return bit / kUnitBits; }

@ -24,15 +24,6 @@ grpc_package(name = "test/core/compression")
licenses(["notice"])
grpc_cc_library(
name = "args_utils",
testonly = 1,
srcs = ["args_utils.cc"],
hdrs = ["args_utils.h"],
visibility = ["//visibility:public"],
deps = ["//:grpc"],
)
grpc_cc_test(
name = "compression_test",
srcs = ["compression_test.cc"],
@ -41,7 +32,6 @@ grpc_cc_test(
uses_event_engine = False,
uses_polling = False,
deps = [
":args_utils",
"//:gpr",
"//:grpc",
"//test/core/util:grpc_test_util",

@ -1,42 +0,0 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/compression/args_utils.h"
#include <string.h>
#include "absl/types/optional.h"
#include <grpc/compression.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/gpr/useful.h"
namespace grpc_core {
ChannelArgs SetCompressionAlgorithmState(const ChannelArgs& args,
grpc_compression_algorithm algorithm,
bool enabled) {
auto state = args.GetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)
.value_or(0);
SetBit(&state, GRPC_COMPRESS_NONE);
if (enabled) {
SetBit(&state, algorithm);
} else {
ClearBit(&state, algorithm);
}
return args.Set(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, state);
}
} // namespace grpc_core

@ -1,31 +0,0 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_COMPRESSION_ARGS_UTILS_H_H
#define GRPC_TEST_CORE_COMPRESSION_ARGS_UTILS_H_H
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_args.h"
namespace grpc_core {
ChannelArgs SetCompressionAlgorithmState(const ChannelArgs& args,
grpc_compression_algorithm algorithm,
bool enabled);
} // namespace grpc_core
#endif

@ -30,7 +30,6 @@
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/compression/args_utils.h"
#include "test/core/util/test_config.h"
TEST(CompressionTest, CompressionAlgorithmParse) {
@ -245,70 +244,6 @@ TEST(CompressionTest, CompressionEnableDisableAlgorithm) {
}
}
TEST(CompressionTest, ChannelArgsSetCompressionAlgorithm) {
grpc_core::ExecCtx exec_ctx;
const grpc_channel_args* ch_args;
ch_args = grpc_channel_args_set_channel_default_compression_algorithm(
nullptr, GRPC_COMPRESS_GZIP);
ASSERT_EQ(ch_args->num_args, 1);
ASSERT_STREQ(ch_args->args[0].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM);
ASSERT_EQ(ch_args->args[0].type, GRPC_ARG_INTEGER);
grpc_channel_args_destroy(ch_args);
}
TEST(CompressionTest, ChannelArgsCompressionAlgorithmStates) {
grpc_core::ExecCtx exec_ctx;
grpc_core::CompressionAlgorithmSet states;
const grpc_channel_args* ch_args =
grpc_channel_args_copy_and_add(nullptr, nullptr, 0);
/* by default, all enabled */
states = grpc_core::CompressionAlgorithmSet::FromChannelArgs(ch_args);
for (size_t i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) {
ASSERT_TRUE(states.IsSet(static_cast<grpc_compression_algorithm>(i)));
}
/* disable gzip and deflate and stream/gzip */
const grpc_channel_args* ch_args_wo_gzip =
grpc_channel_args_compression_algorithm_set_state(&ch_args,
GRPC_COMPRESS_GZIP, 0);
ASSERT_EQ(ch_args, ch_args_wo_gzip);
const grpc_channel_args* ch_args_wo_gzip_deflate =
grpc_channel_args_compression_algorithm_set_state(
&ch_args_wo_gzip, GRPC_COMPRESS_DEFLATE, 0);
ASSERT_EQ(ch_args_wo_gzip, ch_args_wo_gzip_deflate);
states = grpc_core::CompressionAlgorithmSet::FromChannelArgs(
ch_args_wo_gzip_deflate);
for (size_t i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) {
if (i == GRPC_COMPRESS_GZIP || i == GRPC_COMPRESS_DEFLATE) {
ASSERT_FALSE(states.IsSet(static_cast<grpc_compression_algorithm>(i)));
} else {
ASSERT_TRUE(states.IsSet(static_cast<grpc_compression_algorithm>(i)));
}
}
/* re-enabled gzip only */
ch_args_wo_gzip = grpc_channel_args_compression_algorithm_set_state(
&ch_args_wo_gzip_deflate, GRPC_COMPRESS_GZIP, 1);
ASSERT_EQ(ch_args_wo_gzip, ch_args_wo_gzip_deflate);
states = grpc_core::CompressionAlgorithmSet::FromChannelArgs(ch_args_wo_gzip);
for (size_t i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) {
if (i == GRPC_COMPRESS_DEFLATE) {
ASSERT_FALSE(states.IsSet(static_cast<grpc_compression_algorithm>(i)));
} else {
ASSERT_TRUE(states.IsSet(static_cast<grpc_compression_algorithm>(i)));
}
}
grpc_channel_args_destroy(ch_args);
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);

@ -28,19 +28,13 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/compression/args_utils.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
struct fullstack_compression_fixture_data {
~fullstack_compression_fixture_data() {
grpc_channel_args_destroy(client_args_compression);
grpc_channel_args_destroy(server_args_compression);
}
~fullstack_compression_fixture_data() {}
std::string localaddr;
const grpc_channel_args* client_args_compression = nullptr;
const grpc_channel_args* server_args_compression = nullptr;
};
static grpc_end2end_test_fixture chttp2_create_fixture_fullstack_compression(
@ -63,16 +57,13 @@ void chttp2_init_client_fullstack_compression(
grpc_end2end_test_fixture* f, const grpc_channel_args* client_args) {
fullstack_compression_fixture_data* ffd =
static_cast<fullstack_compression_fixture_data*>(f->fixture_data);
if (ffd->client_args_compression != nullptr) {
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(ffd->client_args_compression);
}
ffd->client_args_compression =
grpc_channel_args_set_channel_default_compression_algorithm(
client_args, GRPC_COMPRESS_GZIP);
grpc_channel_credentials* creds = grpc_insecure_credentials_create();
f->client = grpc_channel_create(ffd->localaddr.c_str(), creds,
ffd->client_args_compression);
f->client = grpc_channel_create(
ffd->localaddr.c_str(), creds,
grpc_core::ChannelArgs::FromC(client_args)
.Set(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, GRPC_COMPRESS_GZIP)
.ToC()
.get());
grpc_channel_credentials_release(creds);
}
@ -80,17 +71,15 @@ void chttp2_init_server_fullstack_compression(
grpc_end2end_test_fixture* f, const grpc_channel_args* server_args) {
fullstack_compression_fixture_data* ffd =
static_cast<fullstack_compression_fixture_data*>(f->fixture_data);
if (ffd->server_args_compression != nullptr) {
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(ffd->server_args_compression);
}
ffd->server_args_compression =
grpc_channel_args_set_channel_default_compression_algorithm(
server_args, GRPC_COMPRESS_GZIP);
if (f->server) {
grpc_server_destroy(f->server);
}
f->server = grpc_server_create(ffd->server_args_compression, nullptr);
f->server = grpc_server_create(
grpc_core::ChannelArgs::FromC(server_args)
.Set(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, GRPC_COMPRESS_GZIP)
.ToC()
.get(),
nullptr);
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_credentials* server_creds =
grpc_insecure_server_credentials_create();

@ -451,7 +451,6 @@ def grpc_end2end_tests():
":local_util",
"//test/core/util:test_lb_policies",
"//:grpc_authorization_provider",
"//test/core/compression:args_utils",
"//:grpc_http_filters",
"//:event_log",
],
@ -476,7 +475,6 @@ def grpc_end2end_tests():
"//test/core/util:grpc_test_util",
"//:grpc",
"//:gpr",
"//test/core/compression:args_utils",
"//:grpc_http_filters",
],
tags = _platform_support_tags(fopt) + fopt.tags,

@ -35,7 +35,6 @@
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/call_test_only.h"
#include "test/core/compression/args_utils.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/util/test_config.h"
@ -109,8 +108,8 @@ static void request_for_disabled_algorithm(
grpc_call* s;
grpc_slice request_payload_slice;
grpc_byte_buffer* request_payload;
const grpc_channel_args* client_args;
const grpc_channel_args* server_args;
grpc_core::ChannelArgs client_args;
grpc_core::ChannelArgs server_args;
grpc_end2end_test_fixture f;
grpc_op ops[6];
grpc_op* op;
@ -130,28 +129,26 @@ static void request_for_disabled_algorithm(
request_payload_slice = grpc_slice_from_copied_string(str);
request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
client_args = grpc_channel_args_set_channel_default_compression_algorithm(
nullptr, requested_client_compression_algorithm);
server_args = grpc_channel_args_set_channel_default_compression_algorithm(
nullptr, GRPC_COMPRESS_NONE);
server_args = grpc_channel_args_compression_algorithm_set_state(
&server_args, algorithm_to_disable, false);
client_args =
grpc_core::ChannelArgs().Set(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
requested_client_compression_algorithm);
server_args =
grpc_core::ChannelArgs()
.Set(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, GRPC_COMPRESS_NONE)
.Set(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
grpc_core::BitSet<GRPC_COMPRESS_ALGORITHMS_COUNT>()
.SetAll(true)
.Set(algorithm_to_disable, false)
.ToInt<uint32_t>());
if (!decompress_in_core) {
grpc_arg disable_decompression_in_core_arg =
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0);
const grpc_channel_args* old_client_args = client_args;
const grpc_channel_args* old_server_args = server_args;
client_args = grpc_channel_args_copy_and_add(
client_args, &disable_decompression_in_core_arg, 1);
server_args = grpc_channel_args_copy_and_add(
server_args, &disable_decompression_in_core_arg, 1);
grpc_channel_args_destroy(old_client_args);
grpc_channel_args_destroy(old_server_args);
client_args =
client_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false);
server_args =
server_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false);
}
f = begin_test(config, test_name, client_args, server_args,
decompress_in_core);
f = begin_test(config, test_name, client_args.ToC().get(),
server_args.ToC().get(), decompress_in_core);
grpc_core::CqVerifier cqv(f.cq);
gpr_timespec deadline = five_seconds_from_now();
@ -266,8 +263,6 @@ static void request_for_disabled_algorithm(
grpc_slice_unref(request_payload_slice);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_channel_args_destroy(client_args);
grpc_channel_args_destroy(server_args);
end_test(&f);
config.tear_down_data(&f);
}
@ -286,8 +281,8 @@ static void request_with_payload_template_inner(
grpc_call* s;
grpc_slice request_payload_slice;
grpc_byte_buffer* request_payload = nullptr;
const grpc_channel_args* client_args;
const grpc_channel_args* server_args;
grpc_core::ChannelArgs client_args;
grpc_core::ChannelArgs server_args;
grpc_end2end_test_fixture f;
grpc_op ops[6];
grpc_op* op;
@ -315,25 +310,20 @@ static void request_with_payload_template_inner(
grpc_slice response_payload_slice =
grpc_slice_from_copied_string(response_str);
client_args = grpc_channel_args_set_channel_default_compression_algorithm(
nullptr, default_client_channel_compression_algorithm);
server_args = grpc_channel_args_set_channel_default_compression_algorithm(
nullptr, default_server_channel_compression_algorithm);
client_args = grpc_core::ChannelArgs().Set(
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
default_client_channel_compression_algorithm);
server_args = grpc_core::ChannelArgs().Set(
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
default_server_channel_compression_algorithm);
if (!decompress_in_core) {
grpc_arg disable_decompression_in_core_arg =
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0);
const grpc_channel_args* old_client_args = client_args;
const grpc_channel_args* old_server_args = server_args;
client_args = grpc_channel_args_copy_and_add(
client_args, &disable_decompression_in_core_arg, 1);
server_args = grpc_channel_args_copy_and_add(
server_args, &disable_decompression_in_core_arg, 1);
grpc_channel_args_destroy(old_client_args);
grpc_channel_args_destroy(old_server_args);
client_args =
client_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false);
server_args =
server_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false);
}
f = begin_test(config, test_name, client_args, server_args,
decompress_in_core);
f = begin_test(config, test_name, client_args.ToC().get(),
server_args.ToC().get(), decompress_in_core);
grpc_core::CqVerifier cqv(f.cq);
gpr_timespec deadline = five_seconds_from_now();
@ -558,8 +548,6 @@ static void request_with_payload_template_inner(
grpc_call_unref(c);
grpc_call_unref(s);
grpc_channel_args_destroy(client_args);
grpc_channel_args_destroy(server_args);
end_test(&f);
config.tear_down_data(&f);
}

@ -593,7 +593,9 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, PhonyFilter, SendEmptyMetadata);
typedef Fixture<&grpc_core::ClientChannel::kFilterVtable, 0>
ClientChannelFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientChannelFilter, NoOp);
typedef Fixture<&grpc_message_compress_filter, CHECKS_NOT_LAST> CompressFilter;
typedef Fixture<&grpc_core::MessageCompressFilter::kClientFilter,
CHECKS_NOT_LAST>
CompressFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, SendEmptyMetadata);
typedef Fixture<&grpc_client_deadline_filter, CHECKS_NOT_LAST>
@ -613,7 +615,9 @@ typedef Fixture<&grpc_core::HttpServerFilter::kFilter, CHECKS_NOT_LAST>
HttpServerFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, SendEmptyMetadata);
typedef Fixture<&grpc_message_size_filter, CHECKS_NOT_LAST> MessageSizeFilter;
typedef Fixture<&grpc_core::MessageCompressFilter::kServerFilter,
CHECKS_NOT_LAST>
MessageSizeFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, SendEmptyMetadata);
// This cmake target is disabled for now because it depends on OpenCensus, which

Loading…
Cancel
Save