Merge pull request #978 from Chilledheart/master

Avoid unnecessary copies during protobuf serialization and deserialization
pull/1005/head
Yang Gao 10 years ago
commit 42734b58ee
  1. 2
      include/grpc/support/slice_buffer.h
  2. 7
      src/core/support/slice_buffer.c
  3. 149
      src/cpp/proto/proto_utils.cc
  4. 9
      test/core/support/slice_buffer_test.c

@ -74,6 +74,8 @@ void gpr_slice_buffer_addn(gpr_slice_buffer *sb, gpr_slice *slices, size_t n);
/* add a very small (less than 8 bytes) amount of data to the end of a slice
buffer: returns a pointer into which to add the data */
gpr_uint8 *gpr_slice_buffer_tiny_add(gpr_slice_buffer *sb, unsigned len);
/* pop the last buffer, but don't unref it */
void gpr_slice_buffer_pop(gpr_slice_buffer *sb);
/* clear a slice buffer, unref all elements */
void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb);

@ -143,6 +143,13 @@ void gpr_slice_buffer_addn(gpr_slice_buffer *sb, gpr_slice *s, size_t n) {
}
}
void gpr_slice_buffer_pop(gpr_slice_buffer *sb) {
if (sb->count != 0) {
size_t count = --sb->count;
sb->length -= GPR_SLICE_LENGTH(sb->slices[count]);
}
}
void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb) {
size_t i;

@ -35,38 +35,135 @@
#include <grpc++/config.h>
#include <grpc/grpc.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/port_platform.h>
#include <google/protobuf/io/zero_copy_stream.h>
namespace grpc {
const int kMaxBufferLength = 8192;
bool SerializeProto(const grpc::protobuf::Message &msg,
grpc_byte_buffer **bp) {
grpc::string msg_str;
bool success = msg.SerializeToString(&msg_str);
if (success) {
gpr_slice slice =
gpr_slice_from_copied_buffer(msg_str.data(), msg_str.length());
*bp = grpc_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
class GrpcBufferWriter GRPC_FINAL
: public ::google::protobuf::io::ZeroCopyOutputStream {
public:
explicit GrpcBufferWriter(grpc_byte_buffer **bp,
int block_size = kMaxBufferLength)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
*bp = grpc_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.slice_buffer;
}
~GrpcBufferWriter() GRPC_OVERRIDE {
if (have_backup_) {
gpr_slice_unref(backup_slice_);
}
}
bool Next(void **data, int *size) GRPC_OVERRIDE {
if (have_backup_) {
slice_ = backup_slice_;
have_backup_ = false;
} else {
slice_ = gpr_slice_malloc(block_size_);
}
*data = GPR_SLICE_START_PTR(slice_);
byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
gpr_slice_buffer_add(slice_buffer_, slice_);
return true;
}
void BackUp(int count) GRPC_OVERRIDE {
gpr_slice_buffer_pop(slice_buffer_);
if (count == block_size_) {
backup_slice_ = slice_;
} else {
backup_slice_ =
gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count);
gpr_slice_buffer_add(slice_buffer_, slice_);
}
have_backup_ = true;
byte_count_ -= count;
}
gpr_int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; }
private:
const int block_size_;
gpr_int64 byte_count_;
gpr_slice_buffer *slice_buffer_;
bool have_backup_;
gpr_slice backup_slice_;
gpr_slice slice_;
};
class GrpcBufferReader GRPC_FINAL
: public ::google::protobuf::io::ZeroCopyInputStream {
public:
explicit GrpcBufferReader(grpc_byte_buffer *buffer)
: byte_count_(0), backup_count_(0) {
reader_ = grpc_byte_buffer_reader_create(buffer);
}
~GrpcBufferReader() GRPC_OVERRIDE {
grpc_byte_buffer_reader_destroy(reader_);
}
return success;
}
bool DeserializeProto(grpc_byte_buffer *buffer,
grpc::protobuf::Message *msg) {
grpc::string msg_string;
grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer);
gpr_slice slice;
while (grpc_byte_buffer_reader_next(reader, &slice)) {
const char *data = reinterpret_cast<const char *>(
slice.refcount ? slice.data.refcounted.bytes
: slice.data.inlined.bytes);
msg_string.append(data, slice.refcount ? slice.data.refcounted.length
: slice.data.inlined.length);
gpr_slice_unref(slice);
bool Next(const void **data, int *size) GRPC_OVERRIDE {
if (backup_count_ > 0) {
*data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
backup_count_;
*size = backup_count_;
backup_count_ = 0;
return true;
}
if (!grpc_byte_buffer_reader_next(reader_, &slice_)) {
return false;
}
gpr_slice_unref(slice_);
*data = GPR_SLICE_START_PTR(slice_);
byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
return true;
}
grpc_byte_buffer_reader_destroy(reader);
return msg->ParseFromString(msg_string);
void BackUp(int count) GRPC_OVERRIDE {
backup_count_ = count;
}
bool Skip(int count) GRPC_OVERRIDE {
const void *data;
int size;
while (Next(&data, &size)) {
if (size >= count) {
BackUp(size - count);
return true;
}
// size < count;
count -= size;
}
// error or we have too large count;
return false;
}
gpr_int64 ByteCount() const GRPC_OVERRIDE {
return byte_count_ - backup_count_;
}
private:
gpr_int64 byte_count_;
gpr_int64 backup_count_;
grpc_byte_buffer_reader *reader_;
gpr_slice slice_;
};
namespace grpc {
bool SerializeProto(const grpc::protobuf::Message &msg, grpc_byte_buffer **bp) {
GrpcBufferWriter writer(bp);
return msg.SerializeToZeroCopyStream(&writer);
}
bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg) {
GrpcBufferReader reader(buffer);
return msg->ParseFromZeroCopyStream(&reader);
}
} // namespace grpc

@ -62,8 +62,13 @@ int main(int argc, char **argv) {
}
GPR_ASSERT(buf.count > 0);
GPR_ASSERT(buf.length == 50);
gpr_slice_unref(aaa);
gpr_slice_unref(bb);
for (i = 0; i < 10; i++) {
gpr_slice_buffer_pop(&buf);
gpr_slice_unref(aaa);
gpr_slice_unref(bb);
}
GPR_ASSERT(buf.count == 0);
GPR_ASSERT(buf.length == 0);
gpr_slice_buffer_destroy(&buf);
return 0;

Loading…
Cancel
Save