[C++] Added a cord support to gRPC protobuf serializer (#32617)

As Protobuf is going to support Cord to reduce memory copy when
[de]serializing Cord fields, gRPC is going to leverage it. This
implementation is based on the internal one but it's slightly modified
to use the public APIs of Cord. only
pull/32897/head
Esun Kim 2 years ago committed by GitHub
parent 9128604b03
commit c523bdac1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      BUILD
  2. 76
      CMakeLists.txt
  3. 20
      build_autogenerated.yaml
  4. 2
      include/grpcpp/impl/codegen/config_protobuf.h
  5. 59
      include/grpcpp/support/proto_buffer_reader.h
  6. 37
      include/grpcpp/support/proto_buffer_writer.h
  7. 30
      test/cpp/util/BUILD
  8. 88
      test/cpp/util/proto_buffer_reader_test.cc
  9. 80
      test/cpp/util/proto_buffer_writer_test.cc
  10. 48
      tools/run_tests/generated/tests.json

@ -801,6 +801,7 @@ grpc_cc_library(
name = "grpc++_public_hdrs",
hdrs = GRPCXX_PUBLIC_HDRS,
external_deps = [
"absl/strings:cord",
"absl/synchronization",
"protobuf_headers",
"protobuf",
@ -823,6 +824,9 @@ grpc_cc_library(
"src/cpp/common/secure_auth_context.h",
"src/cpp/server/secure_server_credentials.h",
],
external_deps = [
"absl/strings:cord",
],
language = "c++",
public_hdrs = GRPCXX_PUBLIC_HDRS,
select_deps = [
@ -1965,6 +1969,7 @@ grpc_cc_library(
grpc_cc_library(
name = "grpc++_codegen_proto",
external_deps = [
"absl/strings:cord",
"protobuf_headers",
"protobuf",
],

76
CMakeLists.txt generated

@ -1116,6 +1116,8 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx promise_factory_test)
add_dependencies(buildtests_cxx promise_map_test)
add_dependencies(buildtests_cxx promise_test)
add_dependencies(buildtests_cxx proto_buffer_reader_test)
add_dependencies(buildtests_cxx proto_buffer_writer_test)
add_dependencies(buildtests_cxx proto_server_reflection_test)
add_dependencies(buildtests_cxx proto_utils_test)
add_dependencies(buildtests_cxx qps_json_driver)
@ -16843,6 +16845,80 @@ target_link_libraries(promise_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(proto_buffer_reader_test
test/cpp/util/proto_buffer_reader_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_compile_features(proto_buffer_reader_test PUBLIC cxx_std_14)
target_include_directories(proto_buffer_reader_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(proto_buffer_reader_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_test_util
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(proto_buffer_writer_test
test/cpp/util/proto_buffer_writer_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_compile_features(proto_buffer_writer_test PUBLIC cxx_std_14)
target_include_directories(proto_buffer_writer_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(proto_buffer_writer_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_test_util
)
endif()
if(gRPC_BUILD_TESTS)

@ -10168,6 +10168,26 @@ targets:
- absl/meta:type_traits
- gpr
uses_polling: false
- name: proto_buffer_reader_test
gtest: true
build: test
language: c++
headers: []
src:
- test/cpp/util/proto_buffer_reader_test.cc
deps:
- grpc++_test_util
uses_polling: false
- name: proto_buffer_writer_test
gtest: true
build: test
language: c++
headers: []
src:
- test/cpp/util/proto_buffer_writer_test.cc
deps:
- grpc++_test_util
uses_polling: false
- name: proto_server_reflection_test
gtest: true
build: test

@ -23,6 +23,8 @@
#define GRPC_OPEN_SOURCE_PROTO
#define GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
#ifndef GRPC_CUSTOM_MESSAGE
#ifdef GRPC_USE_PROTO_LITE
#include <google/protobuf/message_lite.h>

@ -21,6 +21,8 @@
#include <type_traits>
#include "absl/strings/cord.h"
#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/impl/grpc_types.h>
@ -120,6 +122,52 @@ class ProtoBufferReader : public grpc::protobuf::io::ZeroCopyInputStream {
/// Returns the total number of bytes read since this object was created.
int64_t ByteCount() const override { return byte_count_ - backup_count_; }
#ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
/// Read the next `count` bytes and append it to the given Cord.
// (override is intentionally omitted here to support old Protobuf which
// doesn't have ReadCord method)
// NOLINTNEXTLINE(modernize-use-override)
virtual bool ReadCord(absl::Cord* cord, int count) {
if (!status().ok()) {
return false;
}
// check for backed up data
if (backup_count() > 0) {
if (backup_count() <= count) {
cord->Append(MakeCordFromSlice(grpc_slice_split_tail(
slice(), GRPC_SLICE_LENGTH(*slice()) - backup_count())));
} else {
cord->Append(MakeCordFromSlice(grpc_slice_sub(
*slice(), GRPC_SLICE_LENGTH(*slice()) - backup_count(),
GRPC_SLICE_LENGTH(*slice()) - backup_count() + count)));
}
int64_t take = std::min(backup_count(), static_cast<int64_t>(count));
set_backup_count(backup_count() - take);
count -= take;
if (count == 0) {
return true;
}
}
while (count > 0) {
if (!grpc_byte_buffer_reader_peek(reader(), mutable_slice_ptr())) {
return false;
}
uint64_t slice_length = GRPC_SLICE_LENGTH(*slice());
set_byte_count(ByteCount() + slice_length);
if (slice_length <= count) {
cord->Append(MakeCordFromSlice(grpc_slice_ref(*slice())));
count -= slice_length;
} else {
cord->Append(MakeCordFromSlice(grpc_slice_split_head(slice(), count)));
set_backup_count(slice_length - count);
return true;
}
}
GPR_ASSERT(count == 0);
return true;
}
#endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
// These protected members are needed to support internal optimizations.
// they expose internal bits of grpc core that are NOT stable. If you have
// a use case needs to use one of these functions, please send an email to
@ -133,6 +181,17 @@ class ProtoBufferReader : public grpc::protobuf::io::ZeroCopyInputStream {
grpc_slice** mutable_slice_ptr() { return &slice_; }
private:
#ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
// This function takes ownership of slice and return a newly created Cord off
// of it.
static absl::Cord MakeCordFromSlice(grpc_slice slice) {
return absl::MakeCordFromExternal(
absl::string_view(reinterpret_cast<char*>(GRPC_SLICE_START_PTR(slice)),
GRPC_SLICE_LENGTH(slice)),
[slice](absl::string_view view) { grpc_slice_unref(slice); });
}
#endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
int64_t byte_count_; ///< total bytes read since object creation
int64_t backup_count_; ///< how far backed up in the stream we are
grpc_byte_buffer_reader reader_; ///< internal object to read \a grpc_slice

@ -21,6 +21,8 @@
#include <type_traits>
#include "absl/strings/cord.h"
#include <grpc/byte_buffer.h>
#include <grpc/impl/grpc_types.h>
#include <grpc/slice.h>
@ -149,6 +151,41 @@ class ProtoBufferWriter : public grpc::protobuf::io::ZeroCopyOutputStream {
/// Returns the total number of bytes written since this object was created.
int64_t ByteCount() const override { return byte_count_; }
#ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
/// Writes cord to the backing byte_buffer, sharing the memory between the
/// blocks of the cord, and the slices of the byte_buffer.
// (override is intentionally omitted here to support old Protobuf which
// doesn't have ReadCord method)
// NOLINTNEXTLINE(modernize-use-override)
virtual bool WriteCord(const absl::Cord& cord) {
grpc_slice_buffer* buffer = slice_buffer();
size_t cur = 0;
for (absl::string_view chunk : cord.Chunks()) {
// TODO(veblush): Revisit this 512 threadhold which could be smaller.
if (chunk.size() < 512) {
// If chunk is small enough, just copy it.
grpc_slice slice =
grpc_slice_from_copied_buffer(chunk.data(), chunk.size());
grpc_slice_buffer_add(buffer, slice);
} else {
// If chunk is large, just use the pointer instead of copying.
// To make sure it's alive while being used, a subcord for chunk is
// created and attached to a grpc_slice instance.
absl::Cord* subcord = new absl::Cord(cord.Subcord(cur, chunk.size()));
grpc_slice slice = grpc_slice_new_with_user_data(
const_cast<uint8_t*>(
reinterpret_cast<const uint8_t*>(chunk.data())),
chunk.size(), [](void* p) { delete static_cast<absl::Cord*>(p); },
subcord);
grpc_slice_buffer_add(buffer, slice);
}
cur += chunk.size();
}
set_byte_count(ByteCount() + cur);
return true;
}
#endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
// These protected members are needed to support internal optimizations.
// they expose internal bits of grpc core that are NOT stable. If you have
// a use case needs to use one of these functions, please send an email to

@ -393,3 +393,33 @@ grpc_cc_library(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "proto_buffer_reader_test",
srcs = [
"proto_buffer_reader_test.cc",
],
external_deps = [
"gtest",
],
uses_event_engine = False,
uses_polling = False,
deps = [
":test_util",
],
)
grpc_cc_test(
name = "proto_buffer_writer_test",
srcs = [
"proto_buffer_writer_test.cc",
],
external_deps = [
"gtest",
],
uses_event_engine = False,
uses_polling = False,
deps = [
":test_util",
],
)

@ -0,0 +1,88 @@
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <gtest/gtest.h>
#include <grpcpp/support/byte_buffer.h>
#include <grpcpp/support/proto_buffer_reader.h>
#include "test/core/util/test_config.h"
namespace grpc {
namespace {
void ExpectBufferEqual(const ByteBuffer& a, const ByteBuffer& b) {
Slice a_slice;
EXPECT_TRUE(a.DumpToSingleSlice(&a_slice).ok());
Slice b_slice;
EXPECT_TRUE(b.DumpToSingleSlice(&b_slice).ok());
EXPECT_EQ(a_slice.size(), b_slice.size());
EXPECT_EQ(memcmp(a_slice.begin(), b_slice.begin(), a_slice.size()), 0);
}
TEST(ProtoBufferReaderTest, Next) {
Slice slices[] = {
Slice(std::string(128, 'a')),
Slice(std::string(256, 'b')),
};
ByteBuffer buffer(slices, 2);
ProtoBufferReader reader(&buffer);
// read all data from buffer
std::vector<Slice> read_slices;
int read_size = 0;
while (read_size < static_cast<int>(buffer.Length())) {
const void* data;
int size = 0;
EXPECT_TRUE(reader.Next(&data, &size));
read_slices.emplace_back(data, size);
read_size += size;
}
EXPECT_EQ(reader.ByteCount(), read_size);
// check if read data is equal to original data
ByteBuffer read_buffer(&*read_slices.begin(), read_slices.size());
ExpectBufferEqual(read_buffer, buffer);
}
#ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
TEST(ProtoBufferReaderTest, ReadCord) {
std::string str1 = std::string(128, 'a');
std::string str2 = std::string(256, 'b');
Slice slices[] = {Slice(str1), Slice(str2)};
ByteBuffer buffer(slices, 2);
ProtoBufferReader reader(&buffer);
// read cords from buffer
absl::Cord cord1;
reader.ReadCord(&cord1, str1.size());
EXPECT_EQ(cord1.size(), str1.size());
EXPECT_EQ(std::string(cord1), str1);
absl::Cord cord2;
reader.ReadCord(&cord2, str2.size());
EXPECT_EQ(cord2.size(), str2.size());
EXPECT_EQ(std::string(cord2), str2);
EXPECT_EQ(reader.ByteCount(), cord1.size() + cord2.size());
}
#endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
} // namespace
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -0,0 +1,80 @@
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <gtest/gtest.h>
#include <grpcpp/support/byte_buffer.h>
#include <grpcpp/support/proto_buffer_writer.h>
#include "test/core/util/test_config.h"
namespace grpc {
namespace {
TEST(ProtoBufferWriterTest, Next) {
ByteBuffer buffer;
ProtoBufferWriter writer(&buffer, 16, 256);
// 1st next
void* data1;
int size1;
EXPECT_TRUE(writer.Next(&data1, &size1));
EXPECT_GT(size1, 0);
memset(data1, 1, size1);
// 2nd next
void* data2;
int size2;
EXPECT_TRUE(writer.Next(&data2, &size2));
EXPECT_GT(size2, 0);
memset(data2, 2, size2);
// Done
EXPECT_EQ(writer.ByteCount(), size1 + size2);
EXPECT_EQ(buffer.Length(), size1 + size2);
Slice slice;
EXPECT_TRUE(buffer.DumpToSingleSlice(&slice).ok());
EXPECT_EQ(memcmp(slice.begin(), data1, size1), 0);
EXPECT_EQ(memcmp(slice.begin() + size1, data2, size2), 0);
}
#ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
TEST(ProtoBufferWriterTest, WriteCord) {
ByteBuffer buffer;
ProtoBufferWriter writer(&buffer, 16, 4096);
// Cord
absl::Cord cord;
std::string str1 = std::string(1024, 'a');
cord.Append(str1);
std::string str2 = std::string(1024, 'b');
cord.Append(str2);
writer.WriteCord(cord);
// Done
EXPECT_EQ(writer.ByteCount(), str1.size() + str2.size());
EXPECT_EQ(buffer.Length(), str1.size() + str2.size());
Slice slice;
EXPECT_TRUE(buffer.DumpToSingleSlice(&slice).ok());
EXPECT_EQ(memcmp(slice.begin() + str1.size(), str2.c_str(), str2.size()), 0);
}
#endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
} // namespace
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -5657,6 +5657,54 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "proto_buffer_reader_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "proto_buffer_writer_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save