Removing compression from grpc_byte_buffer_reader, removing ManualConstructor in decompress filter and fixing tests

reviewable/pr22575/r5
Yash Tibrewal 5 years ago
parent ce8a1df713
commit d24387ae4f
  1. 1
      include/grpc/impl/codegen/byte_buffer_reader.h
  2. 7
      include/grpc/impl/codegen/grpc_types.h
  3. 28
      src/core/ext/filters/http/http_filters_plugin.cc
  4. 12
      src/core/ext/filters/http/message_decompress/message_decompress_filter.cc
  5. 55
      src/core/lib/surface/byte_buffer_reader.cc
  6. 6
      test/core/channel/minimal_stack_is_minimal_test.cc
  7. 42
      test/core/end2end/cq_verifier.cc
  8. 1
      test/core/end2end/tests/compressed_payload.cc
  9. 73
      test/core/surface/byte_buffer_reader_test.cc

@ -27,7 +27,6 @@ struct grpc_byte_buffer;
struct grpc_byte_buffer_reader {
struct grpc_byte_buffer* buffer_in;
struct grpc_byte_buffer* buffer_out;
/** Different current objects correspond to different types of byte buffers */
union grpc_byte_buffer_reader_current {
/** Index into a slice buffer's array of slices */

@ -175,11 +175,8 @@ typedef struct {
GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0. */
#define GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION "grpc.per_message_compression"
/** Experimental Arg. Enable/disable support for per-message decompression.
Defaults to 1, unless GRPC_ARG_MINIMAL_STACK is enabled, in which case it
defaults to 0. If disabled, decompression will be performed lazily by
grpc_byte_buffer_reader. This arg also determines whether max message limits
will be applied to the decompressed buffer or the non-decompressed buffer. It
is recommended to keep this enabled to protect against zip bomb attacks. */
Defaults to 1. If disabled, decompression will not be performed and the
application will see the compressed message in the byte buffer. */
#define GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION_INSIDE_CORE \
"grpc.per_message_decompression_inside_core"
/** Enable/disable support for deadline checking. Defaults to 1, unless

@ -47,6 +47,7 @@ static bool is_building_http_like_transport(
return t != nullptr && strstr(t->vtable->name, "http");
}
template <bool enable_in_minimal_stack>
static bool maybe_add_optional_filter(grpc_channel_stack_builder* builder,
void* arg) {
if (!is_building_http_like_transport(builder)) return true;
@ -55,7 +56,8 @@ static bool maybe_add_optional_filter(grpc_channel_stack_builder* builder,
grpc_channel_stack_builder_get_channel_arguments(builder);
bool enable = grpc_channel_arg_get_bool(
grpc_channel_args_find(channel_args, filtarg->control_channel_arg),
!grpc_channel_args_want_minimal_stack(channel_args));
enable_in_minimal_stack ||
!grpc_channel_args_want_minimal_stack(channel_args));
return enable ? grpc_channel_stack_builder_prepend_filter(
builder, filtarg->filter, nullptr, nullptr)
: true;
@ -71,24 +73,24 @@ static bool maybe_add_required_filter(grpc_channel_stack_builder* builder,
}
void grpc_http_filters_init(void) {
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter, &compress_filter);
grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter, &compress_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter, &compress_filter);
grpc_channel_init_register_stage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter, &decompress_filter);
maybe_add_optional_filter<false>, &compress_filter);
grpc_channel_init_register_stage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter, &decompress_filter);
maybe_add_optional_filter<false>, &compress_filter);
grpc_channel_init_register_stage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter, &decompress_filter);
maybe_add_optional_filter<false>, &compress_filter);
grpc_channel_init_register_stage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter<true>, &decompress_filter);
grpc_channel_init_register_stage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter<true>, &decompress_filter);
grpc_channel_init_register_stage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter<true>, &decompress_filter);
grpc_channel_init_register_stage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_required_filter, (void*)&grpc_http_client_filter);

@ -102,7 +102,8 @@ class CallData {
// It is initialized during construction and reset when a new stream is
// created using it.
grpc_slice_buffer recv_slices_;
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream>
std::aligned_storage<sizeof(grpc_core::SliceBufferByteStream),
alignof(grpc_core::SliceBufferByteStream)>::type
recv_replacement_stream_;
// Fields for handling recv_trailing_metadata_ready callback
bool seen_recv_trailing_metadata_ready_ = false;
@ -181,7 +182,8 @@ void CallData::OnRecvMessageReady(void* arg, grpc_error* error) {
void CallData::ContinueReadingRecvMessage() {
while ((*recv_message_)
->Next(~static_cast<size_t>(0), &on_recv_message_next_done_)) {
->Next((*recv_message_)->length() - recv_slices_.length,
&on_recv_message_next_done_)) {
grpc_error* error = PullSliceFromRecvMessage();
if (error != GRPC_ERROR_NONE) {
return ContinueRecvMessageReadyCallback(error);
@ -240,8 +242,10 @@ void CallData::FinishRecvMessage() {
// batch down.
// Initializing recv_replacement_stream_ with decompressed_slices removes
// all the slices from decompressed_slices leaving it empty.
recv_replacement_stream_.Init(&decompressed_slices, recv_flags);
recv_message_->reset(recv_replacement_stream_.get());
new (&recv_replacement_stream_)
grpc_core::SliceBufferByteStream(&decompressed_slices, recv_flags);
recv_message_->reset(reinterpret_cast<grpc_core::SliceBufferByteStream*>(
&recv_replacement_stream_));
recv_message_ = nullptr;
}
ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error_));

@ -22,57 +22,19 @@
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_internal.h"
static int is_compressed(grpc_byte_buffer* buffer) {
switch (buffer->type) {
case GRPC_BB_RAW:
if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) {
return 0 /* GPR_FALSE */;
}
break;
}
return 1 /* GPR_TRUE */;
}
int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) {
grpc_core::ExecCtx exec_ctx;
grpc_slice_buffer decompressed_slices_buffer;
reader->buffer_in = buffer;
switch (reader->buffer_in->type) {
case GRPC_BB_RAW:
grpc_slice_buffer_init(&decompressed_slices_buffer);
if (is_compressed(reader->buffer_in)) {
if (grpc_msg_decompress(
grpc_compression_algorithm_to_message_compression_algorithm(
reader->buffer_in->data.raw.compression),
&reader->buffer_in->data.raw.slice_buffer,
&decompressed_slices_buffer) == 0) {
gpr_log(GPR_ERROR,
"Unexpected error decompressing data for algorithm with enum "
"value '%d'.",
reader->buffer_in->data.raw.compression);
memset(reader, 0, sizeof(*reader));
return 0;
} else { /* all fine */
reader->buffer_out =
grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
decompressed_slices_buffer.count);
}
grpc_slice_buffer_destroy_internal(&decompressed_slices_buffer);
} else { /* not compressed, use the input buffer as output */
reader->buffer_out = reader->buffer_in;
}
reader->current.index = 0;
break;
}
@ -80,23 +42,14 @@ int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
return 1;
}
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader) {
switch (reader->buffer_in->type) {
case GRPC_BB_RAW:
/* keeping the same if-else structure as in the init function */
if (is_compressed(reader->buffer_in)) {
grpc_byte_buffer_destroy(reader->buffer_out);
}
break;
}
}
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader) {}
int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
grpc_slice** slice) {
switch (reader->buffer_in->type) {
case GRPC_BB_RAW: {
grpc_slice_buffer* slice_buffer;
slice_buffer = &reader->buffer_out->data.raw.slice_buffer;
slice_buffer = &reader->buffer_in->data.raw.slice_buffer;
if (reader->current.index < slice_buffer->count) {
*slice = &slice_buffer->slices[reader->current.index];
reader->current.index += 1;
@ -113,7 +66,7 @@ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
switch (reader->buffer_in->type) {
case GRPC_BB_RAW: {
grpc_slice_buffer* slice_buffer;
slice_buffer = &reader->buffer_out->data.raw.slice_buffer;
slice_buffer = &reader->buffer_in->data.raw.slice_buffer;
if (reader->current.index < slice_buffer->count) {
*slice = grpc_slice_ref_internal(
slice_buffer->slices[reader->current.index]);
@ -129,7 +82,7 @@ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader* reader) {
grpc_slice in_slice;
size_t bytes_read = 0;
const size_t input_size = grpc_byte_buffer_length(reader->buffer_out);
const size_t input_size = grpc_byte_buffer_length(reader->buffer_in);
grpc_slice out_slice = GRPC_SLICE_MALLOC(input_size);
uint8_t* const outbuf = GRPC_SLICE_START_PTR(out_slice); /* just an alias */

@ -68,11 +68,11 @@ int main(int argc, char** argv) {
grpc_channel_args minimal_stack_args = {1, &minimal_stack_arg};
errors +=
CHECK_STACK("unknown", &minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL,
"authority", "connected", NULL);
"authority", "message_decompress", "connected", NULL);
errors += CHECK_STACK("unknown", &minimal_stack_args, GRPC_CLIENT_SUBCHANNEL,
"authority", "connected", NULL);
"authority", "message_decompress", "connected", NULL);
errors += CHECK_STACK("unknown", &minimal_stack_args, GRPC_SERVER_CHANNEL,
"server", "connected", NULL);
"server", "message_decompress", "connected", NULL);
errors +=
CHECK_STACK("chttp2", &minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL,
"authority", "http-client", "connected", NULL);

@ -29,6 +29,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/surface/event_string.h"
@ -145,33 +147,25 @@ int raw_byte_buffer_eq_slice(grpc_byte_buffer* rbb, grpc_slice b) {
}
int byte_buffer_eq_slice(grpc_byte_buffer* bb, grpc_slice b) {
grpc_byte_buffer_reader reader;
grpc_byte_buffer* rbb;
int res;
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb) &&
"Couldn't init byte buffer reader");
rbb = grpc_raw_byte_buffer_from_reader(&reader);
res = raw_byte_buffer_eq_slice(rbb, b);
grpc_byte_buffer_reader_destroy(&reader);
grpc_byte_buffer_destroy(rbb);
return res;
if (bb->data.raw.compression > GRPC_COMPRESS_NONE) {
grpc_slice_buffer decompressed_buffer;
grpc_slice_buffer_init(&decompressed_buffer);
GPR_ASSERT(grpc_msg_decompress(
grpc_compression_algorithm_to_message_compression_algorithm(
bb->data.raw.compression),
&bb->data.raw.slice_buffer, &decompressed_buffer));
grpc_byte_buffer* rbb = grpc_raw_byte_buffer_create(
decompressed_buffer.slices, decompressed_buffer.count);
int ret_val = raw_byte_buffer_eq_slice(rbb, b);
grpc_byte_buffer_destroy(rbb);
grpc_slice_buffer_destroy(&decompressed_buffer);
return ret_val;
}
return raw_byte_buffer_eq_slice(bb, b);
}
int byte_buffer_eq_string(grpc_byte_buffer* bb, const char* str) {
grpc_byte_buffer_reader reader;
grpc_byte_buffer* rbb;
int res;
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb) &&
"Couldn't init byte buffer reader");
rbb = grpc_raw_byte_buffer_from_reader(&reader);
res = raw_byte_buffer_eq_slice(rbb, grpc_slice_from_copied_string(str));
grpc_byte_buffer_reader_destroy(&reader);
grpc_byte_buffer_destroy(rbb);
return res;
return byte_buffer_eq_slice(bb, grpc_slice_from_copied_string(str));
}
static bool is_probably_integer(void* p) { return ((uintptr_t)p) < 1000000; }

@ -491,6 +491,7 @@ static void request_with_payload_template_inner(
cq_verify(cqv);
GPR_ASSERT(response_payload_recv->type == GRPC_BB_RAW);
gpr_log(GPR_ERROR, "%d", decompress_in_core);
GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, response_str));
if (server_compression_level > GRPC_COMPRESS_LEVEL_NONE) {
const grpc_compression_algorithm algo_for_server_level =

@ -25,7 +25,6 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/util/test_config.h"
@ -168,75 +167,6 @@ static void test_peek_none_compressed_slice(void) {
grpc_byte_buffer_destroy(buffer);
}
static void test_read_corrupted_slice(void) {
grpc_slice slice;
grpc_byte_buffer* buffer;
grpc_byte_buffer_reader reader;
LOG_TEST("test_read_corrupted_slice");
slice = grpc_slice_from_copied_string("test");
buffer = grpc_raw_byte_buffer_create(&slice, 1);
buffer->data.raw.compression = GRPC_COMPRESS_GZIP; /* lies! */
grpc_slice_unref(slice);
GPR_ASSERT(!grpc_byte_buffer_reader_init(&reader, buffer));
grpc_byte_buffer_destroy(buffer);
}
static void read_compressed_slice(grpc_compression_algorithm algorithm,
size_t input_size) {
grpc_slice input_slice;
grpc_slice_buffer sliceb_in;
grpc_slice_buffer sliceb_out;
grpc_byte_buffer* buffer;
grpc_byte_buffer_reader reader;
grpc_slice read_slice;
size_t read_count = 0;
grpc_slice_buffer_init(&sliceb_in);
grpc_slice_buffer_init(&sliceb_out);
input_slice = grpc_slice_malloc(input_size);
memset(GRPC_SLICE_START_PTR(input_slice), 'a', input_size);
grpc_slice_buffer_add(&sliceb_in, input_slice); /* takes ownership */
{
grpc_core::ExecCtx exec_ctx;
GPR_ASSERT(grpc_msg_compress(
grpc_compression_algorithm_to_message_compression_algorithm(algorithm),
&sliceb_in, &sliceb_out));
}
buffer = grpc_raw_compressed_byte_buffer_create(sliceb_out.slices,
sliceb_out.count, algorithm);
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) &&
"Couldn't init byte buffer reader");
while (grpc_byte_buffer_reader_next(&reader, &read_slice)) {
GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(read_slice),
GRPC_SLICE_START_PTR(input_slice) + read_count,
GRPC_SLICE_LENGTH(read_slice)) == 0);
read_count += GRPC_SLICE_LENGTH(read_slice);
grpc_slice_unref(read_slice);
}
GPR_ASSERT(read_count == input_size);
grpc_byte_buffer_reader_destroy(&reader);
grpc_byte_buffer_destroy(buffer);
grpc_slice_buffer_destroy(&sliceb_out);
grpc_slice_buffer_destroy(&sliceb_in);
}
static void test_read_gzip_compressed_slice(void) {
const size_t INPUT_SIZE = 2048;
LOG_TEST("test_read_gzip_compressed_slice");
read_compressed_slice(GRPC_COMPRESS_GZIP, INPUT_SIZE);
}
static void test_read_deflate_compressed_slice(void) {
const size_t INPUT_SIZE = 2048;
LOG_TEST("test_read_deflate_compressed_slice");
read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE);
}
static void test_byte_buffer_from_reader(void) {
grpc_slice slice;
grpc_byte_buffer *buffer, *buffer_from_reader;
@ -342,9 +272,6 @@ int main(int argc, char** argv) {
test_peek_one_slice();
test_peek_one_slice_malloc();
test_peek_none_compressed_slice();
test_read_gzip_compressed_slice();
test_read_deflate_compressed_slice();
test_read_corrupted_slice();
test_byte_buffer_from_reader();
test_byte_buffer_copy();
test_readall();

Loading…
Cancel
Save