Merge pull request #10743 from ncteisen/serialization-refactor

Add to Codegen Interface, Refactor Proto Serialization
pull/10930/head^2
Noah Eisen 8 years ago committed by GitHub
commit c4a7f8dfe9
  1. 2
      include/grpc++/impl/codegen/config_protobuf.h
  2. 6
      include/grpc++/impl/codegen/core_codegen.h
  3. 7
      include/grpc++/impl/codegen/core_codegen_interface.h
  4. 111
      include/grpc++/impl/codegen/proto_utils.h
  5. 14
      src/cpp/common/core_codegen.cc

@ -34,6 +34,8 @@
#ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H #ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
#define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H #define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
#define GRPC_OPEN_SOURCE_PROTO
#ifndef GRPC_CUSTOM_PROTOBUF_INT64 #ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h> #include <google/protobuf/stubs/common.h>
#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 #define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64

@ -90,11 +90,15 @@ class CoreCodegen final : public CoreCodegenInterface {
grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice, grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) override; size_t nslices) override;
grpc_slice grpc_slice_new_with_user_data(void* p, size_t len,
void (*destroy)(void*),
void* user_data) override;
grpc_slice grpc_empty_slice() override; grpc_slice grpc_empty_slice() override;
grpc_slice grpc_slice_malloc(size_t length) override; grpc_slice grpc_slice_malloc(size_t length) override;
void grpc_slice_unref(grpc_slice slice) override; void grpc_slice_unref(grpc_slice slice) override;
grpc_slice grpc_slice_ref(grpc_slice slice) override;
grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) override; grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) override;
grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) override;
void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) override; void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) override;
void grpc_slice_buffer_pop(grpc_slice_buffer* sb) override; void grpc_slice_buffer_pop(grpc_slice_buffer* sb) override;
grpc_slice grpc_slice_from_static_buffer(const void* buffer, grpc_slice grpc_slice_from_static_buffer(const void* buffer,

@ -101,15 +101,18 @@ class CoreCodegenInterface {
virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice, virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) = 0; size_t nslices) = 0;
virtual grpc_slice grpc_slice_new_with_user_data(void* p, size_t len,
void (*destroy)(void*),
void* user_data) = 0;
virtual void grpc_call_ref(grpc_call* call) = 0; virtual void grpc_call_ref(grpc_call* call) = 0;
virtual void grpc_call_unref(grpc_call* call) = 0; virtual void grpc_call_unref(grpc_call* call) = 0;
virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) = 0; virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) = 0;
virtual grpc_slice grpc_empty_slice() = 0; virtual grpc_slice grpc_empty_slice() = 0;
virtual grpc_slice grpc_slice_malloc(size_t length) = 0; virtual grpc_slice grpc_slice_malloc(size_t length) = 0;
virtual void grpc_slice_unref(grpc_slice slice) = 0; virtual void grpc_slice_unref(grpc_slice slice) = 0;
virtual grpc_slice grpc_slice_ref(grpc_slice slice) = 0;
virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0; virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0;
virtual grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) = 0;
virtual void grpc_slice_buffer_add(grpc_slice_buffer* sb, virtual void grpc_slice_buffer_add(grpc_slice_buffer* sb,
grpc_slice slice) = 0; grpc_slice slice) = 0;
virtual void grpc_slice_buffer_pop(grpc_slice_buffer* sb) = 0; virtual void grpc_slice_buffer_pop(grpc_slice_buffer* sb) = 0;

@ -54,8 +54,7 @@ class GrpcBufferWriterPeer;
const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024; const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024;
class GrpcBufferWriter final class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
: public ::grpc::protobuf::io::ZeroCopyOutputStream {
public: public:
explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) { : block_size_(block_size), byte_count_(0), have_backup_(false) {
@ -103,6 +102,8 @@ class GrpcBufferWriter final
grpc::protobuf::int64 ByteCount() const override { return byte_count_; } grpc::protobuf::int64 ByteCount() const override { return byte_count_; }
grpc_slice_buffer* SliceBuffer() { return slice_buffer_; }
private: private:
friend class GrpcBufferWriterPeer; friend class GrpcBufferWriterPeer;
const int block_size_; const int block_size_;
@ -113,8 +114,7 @@ class GrpcBufferWriter final
grpc_slice slice_; grpc_slice slice_;
}; };
class GrpcBufferReader final class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
: public ::grpc::protobuf::io::ZeroCopyInputStream {
public: public:
explicit GrpcBufferReader(grpc_byte_buffer* buffer) explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0), status_() { : byte_count_(0), backup_count_(0), status_() {
@ -175,64 +175,91 @@ class GrpcBufferReader final
return byte_count_ - backup_count_; return byte_count_ - backup_count_;
} }
private: protected:
int64_t byte_count_; int64_t byte_count_;
int64_t backup_count_; int64_t backup_count_;
grpc_byte_buffer_reader reader_; grpc_byte_buffer_reader reader_;
grpc_slice slice_; grpc_slice slice_;
Status status_; Status status_;
}; };
template <class BufferWriter, class T>
Status GenericSerialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp, bool* own_buffer) {
static_assert(
std::is_base_of<protobuf::io::ZeroCopyOutputStream, BufferWriter>::value,
"BufferWriter must be a subclass of io::ZeroCopyOutputStream");
*own_buffer = true;
int byte_size = msg.ByteSize();
if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) {
grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
GPR_CODEGEN_ASSERT(
GRPC_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
g_core_codegen_interface->grpc_slice_unref(slice);
return g_core_codegen_interface->ok();
} else {
BufferWriter writer(bp, internal::kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
}
template <class BufferReader, class T>
Status GenericDeserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg) {
static_assert(
std::is_base_of<protobuf::io::ZeroCopyInputStream, BufferReader>::value,
"BufferReader must be a subclass of io::ZeroCopyInputStream");
if (buffer == nullptr) {
return Status(StatusCode::INTERNAL, "No payload");
}
Status result = g_core_codegen_interface->ok();
{
BufferReader reader(buffer);
if (!reader.status().ok()) {
return reader.status();
}
::grpc::protobuf::io::CodedInputStream decoder(&reader);
decoder.SetTotalBytesLimit(INT_MAX, INT_MAX);
if (!msg->ParseFromCodedStream(&decoder)) {
result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
result = Status(StatusCode::INTERNAL, "Did not read entire message");
}
}
g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
return result;
}
} // namespace internal } // namespace internal
// this is needed so the following class does not conflict with protobuf
// serializers that utilize internal-only tools.
#ifdef GRPC_OPEN_SOURCE_PROTO
// This class provides a protobuf serializer. It translates between protobuf
// objects and grpc_byte_buffers. More information about SerializationTraits can
// be found in include/grpc++/impl/codegen/serialization_traits.h.
template <class T> template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of< class SerializationTraits<T, typename std::enable_if<std::is_base_of<
grpc::protobuf::Message, T>::value>::type> { grpc::protobuf::Message, T>::value>::type> {
public: public:
static Status Serialize(const grpc::protobuf::Message& msg, static Status Serialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp, bool* own_buffer) { grpc_byte_buffer** bp, bool* own_buffer) {
*own_buffer = true; return internal::GenericSerialize<internal::GrpcBufferWriter, T>(
int byte_size = msg.ByteSize(); msg, bp, own_buffer);
if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) {
grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
GPR_CODEGEN_ASSERT(
GRPC_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
g_core_codegen_interface->grpc_slice_unref(slice);
return g_core_codegen_interface->ok();
} else {
internal::GrpcBufferWriter writer(
bp, internal::kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
} }
static Status Deserialize(grpc_byte_buffer* buffer, static Status Deserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg) { grpc::protobuf::Message* msg) {
if (buffer == nullptr) { return internal::GenericDeserialize<internal::GrpcBufferReader, T>(buffer,
return Status(StatusCode::INTERNAL, "No payload"); msg);
}
Status result = g_core_codegen_interface->ok();
{
internal::GrpcBufferReader reader(buffer);
if (!reader.status().ok()) {
return reader.status();
}
::grpc::protobuf::io::CodedInputStream decoder(&reader);
decoder.SetTotalBytesLimit(INT_MAX, INT_MAX);
if (!msg->ParseFromCodedStream(&decoder)) {
result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
result = Status(StatusCode::INTERNAL, "Did not read entire message");
}
}
g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
return result;
} }
}; };
#endif
} // namespace grpc } // namespace grpc

@ -134,6 +134,12 @@ grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(grpc_slice* slice,
return ::grpc_raw_byte_buffer_create(slice, nslices); return ::grpc_raw_byte_buffer_create(slice, nslices);
} }
grpc_slice CoreCodegen::grpc_slice_new_with_user_data(void* p, size_t len,
void (*destroy)(void*),
void* user_data) {
return ::grpc_slice_new_with_user_data(p, len, destroy, user_data);
}
grpc_slice CoreCodegen::grpc_empty_slice() { return ::grpc_empty_slice(); } grpc_slice CoreCodegen::grpc_empty_slice() { return ::grpc_empty_slice(); }
grpc_slice CoreCodegen::grpc_slice_malloc(size_t length) { grpc_slice CoreCodegen::grpc_slice_malloc(size_t length) {
@ -144,10 +150,18 @@ void CoreCodegen::grpc_slice_unref(grpc_slice slice) {
::grpc_slice_unref(slice); ::grpc_slice_unref(slice);
} }
grpc_slice CoreCodegen::grpc_slice_ref(grpc_slice slice) {
return ::grpc_slice_ref(slice);
}
grpc_slice CoreCodegen::grpc_slice_split_tail(grpc_slice* s, size_t split) { grpc_slice CoreCodegen::grpc_slice_split_tail(grpc_slice* s, size_t split) {
return ::grpc_slice_split_tail(s, split); return ::grpc_slice_split_tail(s, split);
} }
grpc_slice CoreCodegen::grpc_slice_split_head(grpc_slice* s, size_t split) {
return ::grpc_slice_split_head(s, split);
}
grpc_slice CoreCodegen::grpc_slice_from_static_buffer(const void* buffer, grpc_slice CoreCodegen::grpc_slice_from_static_buffer(const void* buffer,
size_t length) { size_t length) {
return ::grpc_slice_from_static_buffer(buffer, length); return ::grpc_slice_from_static_buffer(buffer, length);

Loading…
Cancel
Save