Merge pull request #13315 from yang-g/proto_utils

Only allocate what we need in the last slice for proto serialization
pull/13046/head
Yang Gao 7 years ago committed by GitHub
commit 5d7cb43022
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      include/grpc++/impl/codegen/proto_utils.h
  2. 110
      test/cpp/codegen/proto_utils_test.cc

@ -41,8 +41,11 @@ const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024;
class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
GrpcBufferWriter(grpc_byte_buffer** bp, int block_size, int total_size)
: block_size_(block_size),
total_size_(total_size),
byte_count_(0),
have_backup_(false) {
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.raw.slice_buffer;
}
@ -54,11 +57,20 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
}
bool Next(void** data, int* size) override {
// Protobuf should not ask for more memory than total_size_.
GPR_CODEGEN_ASSERT(byte_count_ < total_size_);
if (have_backup_) {
slice_ = backup_slice_;
have_backup_ = false;
} else {
slice_ = g_core_codegen_interface->grpc_slice_malloc(block_size_);
// When less than a whole block is needed, only allocate that much.
// But make sure the allocated slice is not inlined.
size_t remain = total_size_ - byte_count_ > block_size_
? block_size_
: total_size_ - byte_count_;
slice_ = g_core_codegen_interface->grpc_slice_malloc(
remain > GRPC_SLICE_INLINED_SIZE ? remain
: GRPC_SLICE_INLINED_SIZE + 1);
}
*data = GRPC_SLICE_START_PTR(slice_);
// On win x64, int is only 32bit
@ -70,7 +82,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
void BackUp(int count) override {
g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_);
if (count == block_size_) {
if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) {
backup_slice_ = slice_;
} else {
backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail(
@ -90,6 +102,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
protected:
friend class GrpcBufferWriterPeer;
const int block_size_;
const int total_size_;
int64_t byte_count_;
grpc_slice_buffer* slice_buffer_;
bool have_backup_;
@ -175,20 +188,20 @@ Status GenericSerialize(const grpc::protobuf::Message& msg,
"BufferWriter must be a subclass of io::ZeroCopyOutputStream");
*own_buffer = true;
int byte_size = msg.ByteSize();
if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
if ((size_t)byte_size <= GRPC_SLICE_INLINED_SIZE) {
grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
GPR_CODEGEN_ASSERT(
GRPC_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
g_core_codegen_interface->grpc_slice_unref(slice);
return g_core_codegen_interface->ok();
} else {
BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
// BufferReader must be a subclass of io::ZeroCopyInputStream.

@ -16,15 +16,16 @@
*
*/
#include <grpc++/impl/codegen/grpc_library.h>
#include <grpc++/impl/codegen/proto_utils.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc/impl/codegen/byte_buffer.h>
#include <grpc/slice.h>
#include <gtest/gtest.h>
namespace grpc {
namespace internal {
static GrpcLibraryInitializer g_gli_initializer;
// Provide access to GrpcBufferWriter internals.
class GrpcBufferWriterPeer {
public:
@ -44,35 +45,120 @@ class ProtoUtilsTest : public ::testing::Test {};
// GrpcBufferWriter Next()/Backup() invocations could result in a dangling
// pointer returned by Next() due to the interaction between grpc_slice inlining
// and GRPC_SLICE_START_PTR.
TEST_F(ProtoUtilsTest, BackupNext) {
// Ensure the GrpcBufferWriter internals are initialized.
g_gli_initializer.summon();
TEST_F(ProtoUtilsTest, TinyBackupThenNext) {
grpc_byte_buffer* bp;
GrpcBufferWriter writer(&bp, 8192);
const int block_size = 1024;
GrpcBufferWriter writer(&bp, block_size, 8192);
GrpcBufferWriterPeer peer(&writer);
void* data;
int size;
// Allocate a slice.
ASSERT_TRUE(writer.Next(&data, &size));
EXPECT_EQ(8192, size);
// Return a single byte. Before the fix that this test acts as a regression
// for, this would have resulted in an inlined backup slice.
EXPECT_EQ(block_size, size);
// Return a single byte.
writer.BackUp(1);
EXPECT_TRUE(!peer.have_backup());
// On the next allocation, the slice is non-inlined.
EXPECT_FALSE(peer.have_backup());
// On the next allocation, the returned slice is non-inlined.
ASSERT_TRUE(writer.Next(&data, &size));
EXPECT_TRUE(peer.slice().refcount != NULL);
EXPECT_EQ(block_size, size);
// Cleanup.
g_core_codegen_interface->grpc_byte_buffer_destroy(bp);
}
namespace {
// Set backup_size to 0 to indicate no backup is needed.
void BufferWriterTest(int block_size, int total_size, int backup_size) {
grpc_byte_buffer* bp;
GrpcBufferWriter writer(&bp, block_size, total_size);
int written_size = 0;
void* data;
int size = 0;
bool backed_up_entire_slice = false;
while (written_size < total_size) {
EXPECT_TRUE(writer.Next(&data, &size));
EXPECT_GT(size, 0);
EXPECT_TRUE(data);
int write_size = size;
bool should_backup = false;
if (backup_size > 0 && size > backup_size) {
write_size = size - backup_size;
should_backup = true;
} else if (size == backup_size && !backed_up_entire_slice) {
// only backup entire slice once.
backed_up_entire_slice = true;
should_backup = true;
write_size = 0;
}
// May need a last backup.
if (write_size + written_size > total_size) {
write_size = total_size - written_size;
should_backup = true;
backup_size = size - write_size;
ASSERT_GT(backup_size, 0);
}
for (int i = 0; i < write_size; i++) {
((uint8_t*)data)[i] = written_size % 128;
written_size++;
}
if (should_backup) {
writer.BackUp(backup_size);
}
}
EXPECT_EQ(grpc_byte_buffer_length(bp), (size_t)total_size);
grpc_byte_buffer_reader reader;
grpc_byte_buffer_reader_init(&reader, bp);
int read_bytes = 0;
while (read_bytes < total_size) {
grpc_slice s;
EXPECT_TRUE(grpc_byte_buffer_reader_next(&reader, &s));
for (size_t i = 0; i < GRPC_SLICE_LENGTH(s); i++) {
EXPECT_EQ(GRPC_SLICE_START_PTR(s)[i], read_bytes % 128);
read_bytes++;
}
grpc_slice_unref(s);
}
EXPECT_EQ(read_bytes, total_size);
grpc_byte_buffer_reader_destroy(&reader);
grpc_byte_buffer_destroy(bp);
}
TEST(WriterTest, TinyBlockTinyBackup) {
for (int i = 2; i < (int)GRPC_SLICE_INLINED_SIZE; i++) {
BufferWriterTest(i, 256, 1);
}
}
TEST(WriterTest, SmallBlockTinyBackup) { BufferWriterTest(64, 256, 1); }
TEST(WriterTest, SmallBlockNoBackup) { BufferWriterTest(64, 256, 0); }
TEST(WriterTest, SmallBlockFullBackup) { BufferWriterTest(64, 256, 64); }
TEST(WriterTest, LargeBlockTinyBackup) { BufferWriterTest(4096, 8192, 1); }
TEST(WriterTest, LargeBlockNoBackup) { BufferWriterTest(4096, 8192, 0); }
TEST(WriterTest, LargeBlockFullBackup) { BufferWriterTest(4096, 8192, 4096); }
TEST(WriterTest, LargeBlockLargeBackup) { BufferWriterTest(4096, 8192, 4095); }
} // namespace
} // namespace internal
} // namespace grpc
int main(int argc, char** argv) {
// Ensure the GrpcBufferWriter internals are initialized.
grpc::internal::GrpcLibraryInitializer init;
init.summon();
grpc::GrpcLibraryCodegen lib;
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

Loading…
Cancel
Save