diff --git a/include/grpc/impl/codegen/byte_buffer_reader.h b/include/grpc/impl/codegen/byte_buffer_reader.h index e06e19558a1..7cd87c82a24 100644 --- a/include/grpc/impl/codegen/byte_buffer_reader.h +++ b/include/grpc/impl/codegen/byte_buffer_reader.h @@ -27,7 +27,6 @@ struct grpc_byte_buffer; struct grpc_byte_buffer_reader { struct grpc_byte_buffer* buffer_in; - struct grpc_byte_buffer* buffer_out; /** Different current objects correspond to different types of byte buffers */ union grpc_byte_buffer_reader_current { /** Index into a slice buffer's array of slices */ diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 26d5984b53a..91df2508a6f 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -175,11 +175,8 @@ typedef struct { GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0. */ #define GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION "grpc.per_message_compression" /** Experimental Arg. Enable/disable support for per-message decompression. - Defaults to 1, unless GRPC_ARG_MINIMAL_STACK is enabled, in which case it - defaults to 0. If disabled, decompression will be performed lazily by - grpc_byte_buffer_reader. This arg also determines whether max message limits - will be applied to the decompressed buffer or the non-decompressed buffer. It - is recommended to keep this enabled to protect against zip bomb attacks. */ + Defaults to 1. If disabled, decompression will not be performed and the + application will see the compressed message in the byte buffer. */ #define GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION_INSIDE_CORE \ "grpc.per_message_decompression_inside_core" /** Enable/disable support for deadline checking. Defaults to 1, unless diff --git a/src/core/ext/filters/http/http_filters_plugin.cc b/src/core/ext/filters/http/http_filters_plugin.cc index 3c90df4a15c..e7024a2b61b 100644 --- a/src/core/ext/filters/http/http_filters_plugin.cc +++ b/src/core/ext/filters/http/http_filters_plugin.cc @@ -47,6 +47,7 @@ static bool is_building_http_like_transport( return t != nullptr && strstr(t->vtable->name, "http"); } +template static bool maybe_add_optional_filter(grpc_channel_stack_builder* builder, void* arg) { if (!is_building_http_like_transport(builder)) return true; @@ -55,7 +56,8 @@ static bool maybe_add_optional_filter(grpc_channel_stack_builder* builder, grpc_channel_stack_builder_get_channel_arguments(builder); bool enable = grpc_channel_arg_get_bool( grpc_channel_args_find(channel_args, filtarg->control_channel_arg), - !grpc_channel_args_want_minimal_stack(channel_args)); + enable_in_minimal_stack || + !grpc_channel_args_want_minimal_stack(channel_args)); return enable ? grpc_channel_stack_builder_prepend_filter( builder, filtarg->filter, nullptr, nullptr) : true; @@ -71,24 +73,24 @@ static bool maybe_add_required_filter(grpc_channel_stack_builder* builder, } void grpc_http_filters_init(void) { - grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, - GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_optional_filter, &compress_filter); - grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, - GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_optional_filter, &compress_filter); - grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, - GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_optional_filter, &compress_filter); grpc_channel_init_register_stage( GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_optional_filter, &decompress_filter); + maybe_add_optional_filter, &compress_filter); grpc_channel_init_register_stage( GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_optional_filter, &decompress_filter); + maybe_add_optional_filter, &compress_filter); grpc_channel_init_register_stage( GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_optional_filter, &decompress_filter); + maybe_add_optional_filter, &compress_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &decompress_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &decompress_filter); + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &decompress_filter); grpc_channel_init_register_stage( GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, maybe_add_required_filter, (void*)&grpc_http_client_filter); diff --git a/src/core/ext/filters/http/message_decompress/message_decompress_filter.cc b/src/core/ext/filters/http/message_decompress/message_decompress_filter.cc index eb90c01449b..a53529c9bb4 100644 --- a/src/core/ext/filters/http/message_decompress/message_decompress_filter.cc +++ b/src/core/ext/filters/http/message_decompress/message_decompress_filter.cc @@ -102,7 +102,8 @@ class CallData { // It is initialized during construction and reset when a new stream is // created using it. grpc_slice_buffer recv_slices_; - grpc_core::ManualConstructor + std::aligned_storage::type recv_replacement_stream_; // Fields for handling recv_trailing_metadata_ready callback bool seen_recv_trailing_metadata_ready_ = false; @@ -181,7 +182,8 @@ void CallData::OnRecvMessageReady(void* arg, grpc_error* error) { void CallData::ContinueReadingRecvMessage() { while ((*recv_message_) - ->Next(~static_cast(0), &on_recv_message_next_done_)) { + ->Next((*recv_message_)->length() - recv_slices_.length, + &on_recv_message_next_done_)) { grpc_error* error = PullSliceFromRecvMessage(); if (error != GRPC_ERROR_NONE) { return ContinueRecvMessageReadyCallback(error); @@ -240,8 +242,10 @@ void CallData::FinishRecvMessage() { // batch down. // Initializing recv_replacement_stream_ with decompressed_slices removes // all the slices from decompressed_slices leaving it empty. - recv_replacement_stream_.Init(&decompressed_slices, recv_flags); - recv_message_->reset(recv_replacement_stream_.get()); + new (&recv_replacement_stream_) + grpc_core::SliceBufferByteStream(&decompressed_slices, recv_flags); + recv_message_->reset(reinterpret_cast( + &recv_replacement_stream_)); recv_message_ = nullptr; } ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error_)); diff --git a/src/core/lib/surface/byte_buffer_reader.cc b/src/core/lib/surface/byte_buffer_reader.cc index ed8ecc49590..baf205d8154 100644 --- a/src/core/lib/surface/byte_buffer_reader.cc +++ b/src/core/lib/surface/byte_buffer_reader.cc @@ -22,57 +22,19 @@ #include #include -#include #include #include #include #include -#include "src/core/lib/compression/message_compress.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice_internal.h" -static int is_compressed(grpc_byte_buffer* buffer) { - switch (buffer->type) { - case GRPC_BB_RAW: - if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) { - return 0 /* GPR_FALSE */; - } - break; - } - return 1 /* GPR_TRUE */; -} - int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, grpc_byte_buffer* buffer) { - grpc_core::ExecCtx exec_ctx; - grpc_slice_buffer decompressed_slices_buffer; reader->buffer_in = buffer; switch (reader->buffer_in->type) { case GRPC_BB_RAW: - grpc_slice_buffer_init(&decompressed_slices_buffer); - if (is_compressed(reader->buffer_in)) { - if (grpc_msg_decompress( - - grpc_compression_algorithm_to_message_compression_algorithm( - reader->buffer_in->data.raw.compression), - &reader->buffer_in->data.raw.slice_buffer, - &decompressed_slices_buffer) == 0) { - gpr_log(GPR_ERROR, - "Unexpected error decompressing data for algorithm with enum " - "value '%d'.", - reader->buffer_in->data.raw.compression); - memset(reader, 0, sizeof(*reader)); - return 0; - } else { /* all fine */ - reader->buffer_out = - grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, - decompressed_slices_buffer.count); - } - grpc_slice_buffer_destroy_internal(&decompressed_slices_buffer); - } else { /* not compressed, use the input buffer as output */ - reader->buffer_out = reader->buffer_in; - } reader->current.index = 0; break; } @@ -80,23 +42,14 @@ int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, return 1; } -void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader) { - switch (reader->buffer_in->type) { - case GRPC_BB_RAW: - /* keeping the same if-else structure as in the init function */ - if (is_compressed(reader->buffer_in)) { - grpc_byte_buffer_destroy(reader->buffer_out); - } - break; - } -} +void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader) {} int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader, grpc_slice** slice) { switch (reader->buffer_in->type) { case GRPC_BB_RAW: { grpc_slice_buffer* slice_buffer; - slice_buffer = &reader->buffer_out->data.raw.slice_buffer; + slice_buffer = &reader->buffer_in->data.raw.slice_buffer; if (reader->current.index < slice_buffer->count) { *slice = &slice_buffer->slices[reader->current.index]; reader->current.index += 1; @@ -113,7 +66,7 @@ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, switch (reader->buffer_in->type) { case GRPC_BB_RAW: { grpc_slice_buffer* slice_buffer; - slice_buffer = &reader->buffer_out->data.raw.slice_buffer; + slice_buffer = &reader->buffer_in->data.raw.slice_buffer; if (reader->current.index < slice_buffer->count) { *slice = grpc_slice_ref_internal( slice_buffer->slices[reader->current.index]); @@ -129,7 +82,7 @@ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader* reader) { grpc_slice in_slice; size_t bytes_read = 0; - const size_t input_size = grpc_byte_buffer_length(reader->buffer_out); + const size_t input_size = grpc_byte_buffer_length(reader->buffer_in); grpc_slice out_slice = GRPC_SLICE_MALLOC(input_size); uint8_t* const outbuf = GRPC_SLICE_START_PTR(out_slice); /* just an alias */ diff --git a/test/core/channel/minimal_stack_is_minimal_test.cc b/test/core/channel/minimal_stack_is_minimal_test.cc index 8ab5bfe8f72..d02c5806f82 100644 --- a/test/core/channel/minimal_stack_is_minimal_test.cc +++ b/test/core/channel/minimal_stack_is_minimal_test.cc @@ -68,11 +68,11 @@ int main(int argc, char** argv) { grpc_channel_args minimal_stack_args = {1, &minimal_stack_arg}; errors += CHECK_STACK("unknown", &minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL, - "authority", "connected", NULL); + "authority", "message_decompress", "connected", NULL); errors += CHECK_STACK("unknown", &minimal_stack_args, GRPC_CLIENT_SUBCHANNEL, - "authority", "connected", NULL); + "authority", "message_decompress", "connected", NULL); errors += CHECK_STACK("unknown", &minimal_stack_args, GRPC_SERVER_CHANNEL, - "server", "connected", NULL); + "server", "message_decompress", "connected", NULL); errors += CHECK_STACK("chttp2", &minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL, "authority", "http-client", "connected", NULL); diff --git a/test/core/end2end/cq_verifier.cc b/test/core/end2end/cq_verifier.cc index f7e64effcd4..3d45b6d647c 100644 --- a/test/core/end2end/cq_verifier.cc +++ b/test/core/end2end/cq_verifier.cc @@ -29,6 +29,8 @@ #include #include #include +#include "src/core/lib/compression/compression_internal.h" +#include "src/core/lib/compression/message_compress.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/surface/event_string.h" @@ -145,33 +147,25 @@ int raw_byte_buffer_eq_slice(grpc_byte_buffer* rbb, grpc_slice b) { } int byte_buffer_eq_slice(grpc_byte_buffer* bb, grpc_slice b) { - grpc_byte_buffer_reader reader; - grpc_byte_buffer* rbb; - int res; - - GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb) && - "Couldn't init byte buffer reader"); - rbb = grpc_raw_byte_buffer_from_reader(&reader); - res = raw_byte_buffer_eq_slice(rbb, b); - grpc_byte_buffer_reader_destroy(&reader); - grpc_byte_buffer_destroy(rbb); - - return res; + if (bb->data.raw.compression > GRPC_COMPRESS_NONE) { + grpc_slice_buffer decompressed_buffer; + grpc_slice_buffer_init(&decompressed_buffer); + GPR_ASSERT(grpc_msg_decompress( + grpc_compression_algorithm_to_message_compression_algorithm( + bb->data.raw.compression), + &bb->data.raw.slice_buffer, &decompressed_buffer)); + grpc_byte_buffer* rbb = grpc_raw_byte_buffer_create( + decompressed_buffer.slices, decompressed_buffer.count); + int ret_val = raw_byte_buffer_eq_slice(rbb, b); + grpc_byte_buffer_destroy(rbb); + grpc_slice_buffer_destroy(&decompressed_buffer); + return ret_val; + } + return raw_byte_buffer_eq_slice(bb, b); } int byte_buffer_eq_string(grpc_byte_buffer* bb, const char* str) { - grpc_byte_buffer_reader reader; - grpc_byte_buffer* rbb; - int res; - - GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb) && - "Couldn't init byte buffer reader"); - rbb = grpc_raw_byte_buffer_from_reader(&reader); - res = raw_byte_buffer_eq_slice(rbb, grpc_slice_from_copied_string(str)); - grpc_byte_buffer_reader_destroy(&reader); - grpc_byte_buffer_destroy(rbb); - - return res; + return byte_buffer_eq_slice(bb, grpc_slice_from_copied_string(str)); } static bool is_probably_integer(void* p) { return ((uintptr_t)p) < 1000000; } diff --git a/test/core/end2end/tests/compressed_payload.cc b/test/core/end2end/tests/compressed_payload.cc index 463c0fb2f79..6441c1c9633 100644 --- a/test/core/end2end/tests/compressed_payload.cc +++ b/test/core/end2end/tests/compressed_payload.cc @@ -491,6 +491,7 @@ static void request_with_payload_template_inner( cq_verify(cqv); GPR_ASSERT(response_payload_recv->type == GRPC_BB_RAW); + gpr_log(GPR_ERROR, "%d", decompress_in_core); GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, response_str)); if (server_compression_level > GRPC_COMPRESS_LEVEL_NONE) { const grpc_compression_algorithm algo_for_server_level = diff --git a/test/core/surface/byte_buffer_reader_test.cc b/test/core/surface/byte_buffer_reader_test.cc index fc2654426d7..47d76f49277 100644 --- a/test/core/surface/byte_buffer_reader_test.cc +++ b/test/core/surface/byte_buffer_reader_test.cc @@ -25,7 +25,6 @@ #include #include -#include "src/core/lib/compression/message_compress.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "test/core/util/test_config.h" @@ -168,75 +167,6 @@ static void test_peek_none_compressed_slice(void) { grpc_byte_buffer_destroy(buffer); } -static void test_read_corrupted_slice(void) { - grpc_slice slice; - grpc_byte_buffer* buffer; - grpc_byte_buffer_reader reader; - - LOG_TEST("test_read_corrupted_slice"); - slice = grpc_slice_from_copied_string("test"); - buffer = grpc_raw_byte_buffer_create(&slice, 1); - buffer->data.raw.compression = GRPC_COMPRESS_GZIP; /* lies! */ - grpc_slice_unref(slice); - GPR_ASSERT(!grpc_byte_buffer_reader_init(&reader, buffer)); - grpc_byte_buffer_destroy(buffer); -} - -static void read_compressed_slice(grpc_compression_algorithm algorithm, - size_t input_size) { - grpc_slice input_slice; - grpc_slice_buffer sliceb_in; - grpc_slice_buffer sliceb_out; - grpc_byte_buffer* buffer; - grpc_byte_buffer_reader reader; - grpc_slice read_slice; - size_t read_count = 0; - - grpc_slice_buffer_init(&sliceb_in); - grpc_slice_buffer_init(&sliceb_out); - - input_slice = grpc_slice_malloc(input_size); - memset(GRPC_SLICE_START_PTR(input_slice), 'a', input_size); - grpc_slice_buffer_add(&sliceb_in, input_slice); /* takes ownership */ - { - grpc_core::ExecCtx exec_ctx; - GPR_ASSERT(grpc_msg_compress( - - grpc_compression_algorithm_to_message_compression_algorithm(algorithm), - &sliceb_in, &sliceb_out)); - } - - buffer = grpc_raw_compressed_byte_buffer_create(sliceb_out.slices, - sliceb_out.count, algorithm); - GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) && - "Couldn't init byte buffer reader"); - - while (grpc_byte_buffer_reader_next(&reader, &read_slice)) { - GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(read_slice), - GRPC_SLICE_START_PTR(input_slice) + read_count, - GRPC_SLICE_LENGTH(read_slice)) == 0); - read_count += GRPC_SLICE_LENGTH(read_slice); - grpc_slice_unref(read_slice); - } - GPR_ASSERT(read_count == input_size); - grpc_byte_buffer_reader_destroy(&reader); - grpc_byte_buffer_destroy(buffer); - grpc_slice_buffer_destroy(&sliceb_out); - grpc_slice_buffer_destroy(&sliceb_in); -} - -static void test_read_gzip_compressed_slice(void) { - const size_t INPUT_SIZE = 2048; - LOG_TEST("test_read_gzip_compressed_slice"); - read_compressed_slice(GRPC_COMPRESS_GZIP, INPUT_SIZE); -} - -static void test_read_deflate_compressed_slice(void) { - const size_t INPUT_SIZE = 2048; - LOG_TEST("test_read_deflate_compressed_slice"); - read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE); -} - static void test_byte_buffer_from_reader(void) { grpc_slice slice; grpc_byte_buffer *buffer, *buffer_from_reader; @@ -342,9 +272,6 @@ int main(int argc, char** argv) { test_peek_one_slice(); test_peek_one_slice_malloc(); test_peek_none_compressed_slice(); - test_read_gzip_compressed_slice(); - test_read_deflate_compressed_slice(); - test_read_corrupted_slice(); test_byte_buffer_from_reader(); test_byte_buffer_copy(); test_readall();