Refactor SerializationTraits

pull/13199/head
ncteisen 7 years ago
parent 0f59869001
commit 023726202a
  1. 2
      include/grpc++/impl/codegen/config_protobuf.h
  2. 1
      include/grpc++/impl/codegen/core_codegen.h
  3. 1
      include/grpc++/impl/codegen/core_codegen_interface.h
  4. 112
      include/grpc++/impl/codegen/proto_utils.h
  5. 4
      src/cpp/common/core_codegen.cc

@ -19,6 +19,8 @@
#ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H #ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
#define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H #define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
#define GRPC_OPEN_SOURCE_PROTO
#ifndef GRPC_CUSTOM_PROTOBUF_INT64 #ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h> #include <google/protobuf/stubs/common.h>
#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 #define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64

@ -89,6 +89,7 @@ class CoreCodegen final : public CoreCodegenInterface {
grpc_slice grpc_slice_ref(grpc_slice slice) override; grpc_slice grpc_slice_ref(grpc_slice slice) override;
grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) override; grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) override;
grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) override; grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) override;
grpc_slice grpc_slice_sub(grpc_slice s, size_t begin, size_t end) override;
void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) override; void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) override;
void grpc_slice_buffer_pop(grpc_slice_buffer* sb) override; void grpc_slice_buffer_pop(grpc_slice_buffer* sb) override;
grpc_slice grpc_slice_from_static_buffer(const void* buffer, grpc_slice grpc_slice_from_static_buffer(const void* buffer,

@ -103,6 +103,7 @@ class CoreCodegenInterface {
virtual grpc_slice grpc_slice_ref(grpc_slice slice) = 0; virtual grpc_slice grpc_slice_ref(grpc_slice slice) = 0;
virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0; virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0;
virtual grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) = 0; virtual grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) = 0;
virtual grpc_slice grpc_slice_sub(grpc_slice s, size_t begin, size_t end) = 0;
virtual void grpc_slice_buffer_add(grpc_slice_buffer* sb, virtual void grpc_slice_buffer_add(grpc_slice_buffer* sb,
grpc_slice slice) = 0; grpc_slice slice) = 0;
virtual void grpc_slice_buffer_pop(grpc_slice_buffer* sb) = 0; virtual void grpc_slice_buffer_pop(grpc_slice_buffer* sb) = 0;

@ -39,8 +39,7 @@ class GrpcBufferWriterPeer;
const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024; const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024;
class GrpcBufferWriter final class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
: public ::grpc::protobuf::io::ZeroCopyOutputStream {
public: public:
explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) { : block_size_(block_size), byte_count_(0), have_backup_(false) {
@ -88,6 +87,8 @@ class GrpcBufferWriter final
grpc::protobuf::int64 ByteCount() const override { return byte_count_; } grpc::protobuf::int64 ByteCount() const override { return byte_count_; }
grpc_slice_buffer* SliceBuffer() { return slice_buffer_; }
private: private:
friend class GrpcBufferWriterPeer; friend class GrpcBufferWriterPeer;
const int block_size_; const int block_size_;
@ -98,8 +99,7 @@ class GrpcBufferWriter final
grpc_slice slice_; grpc_slice slice_;
}; };
class GrpcBufferReader final class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
: public ::grpc::protobuf::io::ZeroCopyInputStream {
public: public:
explicit GrpcBufferReader(grpc_byte_buffer* buffer) explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0), status_() { : byte_count_(0), backup_count_(0), status_() {
@ -160,7 +160,7 @@ class GrpcBufferReader final
return byte_count_ - backup_count_; return byte_count_ - backup_count_;
} }
private: protected:
int64_t byte_count_; int64_t byte_count_;
int64_t backup_count_; int64_t backup_count_;
grpc_byte_buffer_reader reader_; grpc_byte_buffer_reader reader_;
@ -168,57 +168,85 @@ class GrpcBufferReader final
Status status_; Status status_;
}; };
// BufferWriter must be a subclass of io::ZeroCopyOutputStream.
template <class BufferWriter, class T>
Status GenericSerialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp, bool* own_buffer) {
static_assert(
std::is_base_of<protobuf::io::ZeroCopyOutputStream, BufferWriter>::value,
"BufferWriter must be a subclass of io::ZeroCopyOutputStream");
*own_buffer = true;
int byte_size = msg.ByteSize();
if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
GPR_CODEGEN_ASSERT(
GRPC_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
g_core_codegen_interface->grpc_slice_unref(slice);
return g_core_codegen_interface->ok();
} else {
BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
}
// BufferReader must be a subclass of io::ZeroCopyInputStream.
template <class BufferReader, class T>
Status GenericDeserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg) {
static_assert(
std::is_base_of<protobuf::io::ZeroCopyInputStream, BufferReader>::value,
"BufferReader must be a subclass of io::ZeroCopyInputStream");
if (buffer == nullptr) {
return Status(StatusCode::INTERNAL, "No payload");
}
Status result = g_core_codegen_interface->ok();
{
BufferReader reader(buffer);
if (!reader.status().ok()) {
return reader.status();
}
::grpc::protobuf::io::CodedInputStream decoder(&reader);
decoder.SetTotalBytesLimit(INT_MAX, INT_MAX);
if (!msg->ParseFromCodedStream(&decoder)) {
result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
result = Status(StatusCode::INTERNAL, "Did not read entire message");
}
}
g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
return result;
}
} // namespace internal } // namespace internal
// this is needed so the following class does not conflict with protobuf
// serializers that utilize internal-only tools.
#ifdef GRPC_OPEN_SOURCE_PROTO
// This class provides a protobuf serializer. It translates between protobuf
// objects and grpc_byte_buffers. More information about SerializationTraits can
// be found in include/grpc++/impl/codegen/serialization_traits.h.
template <class T> template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of< class SerializationTraits<T, typename std::enable_if<std::is_base_of<
grpc::protobuf::Message, T>::value>::type> { grpc::protobuf::Message, T>::value>::type> {
public: public:
static Status Serialize(const grpc::protobuf::Message& msg, static Status Serialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp, bool* own_buffer) { grpc_byte_buffer** bp, bool* own_buffer) {
*own_buffer = true; return internal::GenericSerialize<internal::GrpcBufferWriter, T>(
int byte_size = msg.ByteSize(); msg, bp, own_buffer);
if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) {
grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
GPR_CODEGEN_ASSERT(
GRPC_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
g_core_codegen_interface->grpc_slice_unref(slice);
return g_core_codegen_interface->ok();
} else {
internal::GrpcBufferWriter writer(
bp, internal::kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
} }
static Status Deserialize(grpc_byte_buffer* buffer, static Status Deserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg) { grpc::protobuf::Message* msg) {
if (buffer == nullptr) { return internal::GenericDeserialize<internal::GrpcBufferReader, T>(buffer,
return Status(StatusCode::INTERNAL, "No payload"); msg);
}
Status result = g_core_codegen_interface->ok();
{
internal::GrpcBufferReader reader(buffer);
if (!reader.status().ok()) {
return reader.status();
}
::grpc::protobuf::io::CodedInputStream decoder(&reader);
decoder.SetTotalBytesLimit(INT_MAX, INT_MAX);
if (!msg->ParseFromCodedStream(&decoder)) {
result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
result = Status(StatusCode::INTERNAL, "Did not read entire message");
}
}
g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
return result;
} }
}; };
#endif
} // namespace grpc } // namespace grpc

@ -156,6 +156,10 @@ grpc_slice CoreCodegen::grpc_slice_split_head(grpc_slice* s, size_t split) {
return ::grpc_slice_split_head(s, split); return ::grpc_slice_split_head(s, split);
} }
grpc_slice CoreCodegen::grpc_slice_sub(grpc_slice s, size_t begin, size_t end) {
return ::grpc_slice_sub(s, begin, end);
}
grpc_slice CoreCodegen::grpc_slice_from_static_buffer(const void* buffer, grpc_slice CoreCodegen::grpc_slice_from_static_buffer(const void* buffer,
size_t length) { size_t length) {
return ::grpc_slice_from_static_buffer(buffer, length); return ::grpc_slice_from_static_buffer(buffer, length);

Loading…
Cancel
Save