From 272eebbbcd74c03dbdf66f96121115bb7f9a2f32 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 8 Nov 2017 15:25:51 -0800 Subject: [PATCH 1/3] Only allocate what we need in the last slice for proto serialization --- include/grpc++/impl/codegen/proto_utils.h | 39 ++++---- test/cpp/codegen/proto_utils_test.cc | 110 +++++++++++++++++++--- 2 files changed, 119 insertions(+), 30 deletions(-) diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index 0f7e115c9af..b5ad3d8470c 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -41,8 +41,11 @@ const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024; class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { public: - explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) - : block_size_(block_size), byte_count_(0), have_backup_(false) { + GrpcBufferWriter(grpc_byte_buffer** bp, int block_size, int total_size) + : block_size_(block_size), + total_size_(total_size), + byte_count_(0), + have_backup_(false) { *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0); slice_buffer_ = &(*bp)->data.raw.slice_buffer; } @@ -54,11 +57,20 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { } bool Next(void** data, int* size) override { + // Protobuf should not ask for more memory than total_size_. + GPR_CODEGEN_ASSERT(byte_count_ < total_size_); if (have_backup_) { slice_ = backup_slice_; have_backup_ = false; } else { - slice_ = g_core_codegen_interface->grpc_slice_malloc(block_size_); + // When less than a whole block is needed, only allocate that much. + // But make sure the allocated slice is not inlined. + size_t remain = total_size_ - byte_count_ > block_size_ + ? block_size_ + : total_size_ - byte_count_; + slice_ = g_core_codegen_interface->grpc_slice_malloc( + remain > GRPC_SLICE_INLINED_SIZE ? remain + : GRPC_SLICE_INLINED_SIZE + 1); } *data = GRPC_SLICE_START_PTR(slice_); // On win x64, int is only 32bit @@ -70,7 +82,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { void BackUp(int count) override { g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_); - if (count == block_size_) { + if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) { backup_slice_ = slice_; } else { backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail( @@ -90,6 +102,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { protected: friend class GrpcBufferWriterPeer; const int block_size_; + const int total_size_; int64_t byte_count_; grpc_slice_buffer* slice_buffer_; bool have_backup_; @@ -175,20 +188,10 @@ Status GenericSerialize(const grpc::protobuf::Message& msg, "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); *own_buffer = true; int byte_size = msg.ByteSize(); - if (byte_size <= kGrpcBufferWriterMaxBufferLength) { - grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); - GPR_CODEGEN_ASSERT( - GRPC_SLICE_END_PTR(slice) == - msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); - *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); - g_core_codegen_interface->grpc_slice_unref(slice); - return g_core_codegen_interface->ok(); - } else { - BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength); - return msg.SerializeToZeroCopyStream(&writer) - ? g_core_codegen_interface->ok() - : Status(StatusCode::INTERNAL, "Failed to serialize message"); - } + BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size); + return msg.SerializeToZeroCopyStream(&writer) + ? g_core_codegen_interface->ok() + : Status(StatusCode::INTERNAL, "Failed to serialize message"); } // BufferReader must be a subclass of io::ZeroCopyInputStream. diff --git a/test/cpp/codegen/proto_utils_test.cc b/test/cpp/codegen/proto_utils_test.cc index fd05c90e9dc..ba89b299ef3 100644 --- a/test/cpp/codegen/proto_utils_test.cc +++ b/test/cpp/codegen/proto_utils_test.cc @@ -16,15 +16,16 @@ * */ +#include #include #include +#include +#include #include namespace grpc { namespace internal { -static GrpcLibraryInitializer g_gli_initializer; - // Provide access to GrpcBufferWriter internals. class GrpcBufferWriterPeer { public: @@ -44,35 +45,120 @@ class ProtoUtilsTest : public ::testing::Test {}; // GrpcBufferWriter Next()/Backup() invocations could result in a dangling // pointer returned by Next() due to the interaction between grpc_slice inlining // and GRPC_SLICE_START_PTR. -TEST_F(ProtoUtilsTest, BackupNext) { - // Ensure the GrpcBufferWriter internals are initialized. - g_gli_initializer.summon(); - +TEST_F(ProtoUtilsTest, TinyBackupThenNext) { grpc_byte_buffer* bp; - GrpcBufferWriter writer(&bp, 8192); + const int block_size = 1024; + GrpcBufferWriter writer(&bp, block_size, 8192); GrpcBufferWriterPeer peer(&writer); void* data; int size; // Allocate a slice. ASSERT_TRUE(writer.Next(&data, &size)); - EXPECT_EQ(8192, size); - // Return a single byte. Before the fix that this test acts as a regression - // for, this would have resulted in an inlined backup slice. + EXPECT_EQ(block_size, size); + // Return a single byte. writer.BackUp(1); - EXPECT_TRUE(!peer.have_backup()); - // On the next allocation, the slice is non-inlined. + EXPECT_FALSE(peer.have_backup()); + // On the next allocation, the returned slice is non-inlined. ASSERT_TRUE(writer.Next(&data, &size)); EXPECT_TRUE(peer.slice().refcount != NULL); + EXPECT_EQ(block_size, size); // Cleanup. g_core_codegen_interface->grpc_byte_buffer_destroy(bp); } +namespace { + +// Set backup_size to 0 to indicate no backup is needed. +void BufferWriterTest(int block_size, int total_size, int backup_size) { + grpc_byte_buffer* bp; + GrpcBufferWriter writer(&bp, block_size, total_size); + + int written_size = 0; + void* data; + int size = 0; + bool backed_up_entire_slice = false; + + while (written_size < total_size) { + EXPECT_TRUE(writer.Next(&data, &size)); + EXPECT_GT(size, 0); + EXPECT_TRUE(data); + int write_size = size; + bool should_backup = false; + if (backup_size > 0 && size > backup_size) { + write_size = size - backup_size; + should_backup = true; + } else if (size == backup_size && !backed_up_entire_slice) { + // only backup entire slice once. + backed_up_entire_slice = true; + should_backup = true; + write_size = 0; + } + // May need a last backup. + if (write_size + written_size > total_size) { + write_size = total_size - written_size; + should_backup = true; + backup_size = size - write_size; + ASSERT_GT(backup_size, 0); + } + for (int i = 0; i < write_size; i++) { + ((uint8_t*)data)[i] = written_size % 128; + written_size++; + } + if (should_backup) { + writer.BackUp(backup_size); + } + } + EXPECT_EQ(grpc_byte_buffer_length(bp), (size_t)total_size); + + grpc_byte_buffer_reader reader; + grpc_byte_buffer_reader_init(&reader, bp); + int read_bytes = 0; + while (read_bytes < total_size) { + grpc_slice s; + EXPECT_TRUE(grpc_byte_buffer_reader_next(&reader, &s)); + for (size_t i = 0; i < GRPC_SLICE_LENGTH(s); i++) { + EXPECT_EQ(GRPC_SLICE_START_PTR(s)[i], read_bytes % 128); + read_bytes++; + } + grpc_slice_unref(s); + } + EXPECT_EQ(read_bytes, total_size); + grpc_byte_buffer_reader_destroy(&reader); + grpc_byte_buffer_destroy(bp); +} + +TEST(WriterTest, TinyBlockTinyBackup) { + for (int i = 2; i < (int)GRPC_SLICE_INLINED_SIZE; i++) { + BufferWriterTest(i, 256, 1); + } +} + +TEST(WriterTest, SmallBlockTinyBackup) { BufferWriterTest(64, 256, 1); } + +TEST(WriterTest, SmallBlockNoBackup) { BufferWriterTest(64, 256, 0); } + +TEST(WriterTest, SmallBlockFullBackup) { BufferWriterTest(64, 256, 64); } + +TEST(WriterTest, LargeBlockTinyBackup) { BufferWriterTest(4096, 8192, 1); } + +TEST(WriterTest, LargeBlockNoBackup) { BufferWriterTest(4096, 8192, 0); } + +TEST(WriterTest, LargeBlockFullBackup) { BufferWriterTest(4096, 8192, 4096); } + +TEST(WriterTest, LargeBlockLargeBackup) { BufferWriterTest(4096, 8192, 4095); } + +} // namespace } // namespace internal } // namespace grpc int main(int argc, char** argv) { + // Ensure the GrpcBufferWriter internals are initialized. + grpc::internal::GrpcLibraryInitializer init; + init.summon(); + grpc::GrpcLibraryCodegen lib; + ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } From c88185900d5deb54fe02c3cdde3fc9359e185287 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 8 Nov 2017 16:33:49 -0800 Subject: [PATCH 2/3] handle 0 byte size message... --- include/grpc++/impl/codegen/proto_utils.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index b5ad3d8470c..799d236e0d3 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -188,6 +188,11 @@ Status GenericSerialize(const grpc::protobuf::Message& msg, "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); *own_buffer = true; int byte_size = msg.ByteSize(); + if (byte_size == 0) { + grpc_slice empty_slice = g_core_codegen_interface->grpc_empty_slice(); + *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&empty_slice, 1); + return g_core_codegen_interface->ok(); + } BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size); return msg.SerializeToZeroCopyStream(&writer) ? g_core_codegen_interface->ok() From b90f0e66e503a8fc675c1cdf3a4301b28a27d62e Mon Sep 17 00:00:00 2001 From: yang-g Date: Thu, 9 Nov 2017 09:51:29 -0800 Subject: [PATCH 3/3] relax for inlined bytes to avoid allocation --- include/grpc++/impl/codegen/proto_utils.h | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index 799d236e0d3..b7636034d4f 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -188,9 +188,14 @@ Status GenericSerialize(const grpc::protobuf::Message& msg, "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); *own_buffer = true; int byte_size = msg.ByteSize(); - if (byte_size == 0) { - grpc_slice empty_slice = g_core_codegen_interface->grpc_empty_slice(); - *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&empty_slice, 1); + if ((size_t)byte_size <= GRPC_SLICE_INLINED_SIZE) { + grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); + GPR_CODEGEN_ASSERT( + GRPC_SLICE_END_PTR(slice) == + msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); + *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); + g_core_codegen_interface->grpc_slice_unref(slice); + return g_core_codegen_interface->ok(); } BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size);