diff --git a/grpc.def b/grpc.def index e0a08d22c19..922f95383a3 100644 --- a/grpc.def +++ b/grpc.def @@ -149,6 +149,7 @@ EXPORTS grpc_byte_buffer_reader_init grpc_byte_buffer_reader_destroy grpc_byte_buffer_reader_next + grpc_byte_buffer_reader_peek grpc_byte_buffer_reader_readall grpc_raw_byte_buffer_from_reader gpr_log_severity_string diff --git a/include/grpc/impl/codegen/byte_buffer.h b/include/grpc/impl/codegen/byte_buffer.h index 774655ed66f..12479068155 100644 --- a/include/grpc/impl/codegen/byte_buffer.h +++ b/include/grpc/impl/codegen/byte_buffer.h @@ -73,6 +73,19 @@ GRPCAPI void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader); GRPCAPI int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, grpc_slice* slice); +/** EXPERIMENTAL API - This function may be removed and changed, in the future. + * + * Updates \a slice with the next piece of data from from \a reader and returns + * 1. Returns 0 at the end of the stream. Caller is responsible for making sure + * the slice pointer remains valid when accessed. + * + * NOTE: Do not use this function unless the caller can guarantee that the + * underlying grpc_byte_buffer outlasts the use of the slice. This is only + * safe when the underlying grpc_byte_buffer remains immutable while slice + * is being accessed. */ +GRPCAPI int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader, + grpc_slice** slice); + /** Merge all data from \a reader into single slice */ GRPCAPI grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader* reader); diff --git a/include/grpcpp/impl/codegen/core_codegen.h b/include/grpcpp/impl/codegen/core_codegen.h index b7ddb0c791c..27729e0d5db 100644 --- a/include/grpcpp/impl/codegen/core_codegen.h +++ b/include/grpcpp/impl/codegen/core_codegen.h @@ -85,6 +85,8 @@ class CoreCodegen final : public CoreCodegenInterface { grpc_byte_buffer_reader* reader) override; int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, grpc_slice* slice) override; + int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader, + grpc_slice** slice) override; grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice, size_t nslices) override; diff --git a/include/grpcpp/impl/codegen/core_codegen_interface.h b/include/grpcpp/impl/codegen/core_codegen_interface.h index 1d92b4f0dff..3792c3d4693 100644 --- a/include/grpcpp/impl/codegen/core_codegen_interface.h +++ b/include/grpcpp/impl/codegen/core_codegen_interface.h @@ -92,6 +92,8 @@ class CoreCodegenInterface { grpc_byte_buffer_reader* reader) = 0; virtual int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, grpc_slice* slice) = 0; + virtual int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader, + grpc_slice** slice) = 0; virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice, size_t nslices) = 0; diff --git a/include/grpcpp/impl/codegen/proto_buffer_reader.h b/include/grpcpp/impl/codegen/proto_buffer_reader.h index 9acae476b11..734da366f3a 100644 --- a/include/grpcpp/impl/codegen/proto_buffer_reader.h +++ b/include/grpcpp/impl/codegen/proto_buffer_reader.h @@ -73,7 +73,7 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { } /// If we have backed up previously, we need to return the backed-up slice if (backup_count_ > 0) { - *data = GRPC_SLICE_START_PTR(slice_) + GRPC_SLICE_LENGTH(slice_) - + *data = GRPC_SLICE_START_PTR(*slice_) + GRPC_SLICE_LENGTH(*slice_) - backup_count_; GPR_CODEGEN_ASSERT(backup_count_ <= INT_MAX); *size = (int)backup_count_; @@ -81,15 +81,14 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { return true; } /// Otherwise get the next slice from the byte buffer reader - if (!g_core_codegen_interface->grpc_byte_buffer_reader_next(&reader_, + if (!g_core_codegen_interface->grpc_byte_buffer_reader_peek(&reader_, &slice_)) { return false; } - g_core_codegen_interface->grpc_slice_unref(slice_); - *data = GRPC_SLICE_START_PTR(slice_); + *data = GRPC_SLICE_START_PTR(*slice_); // On win x64, int is only 32bit - GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); - byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_); + GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(*slice_) <= INT_MAX); + byte_count_ += * size = (int)GRPC_SLICE_LENGTH(*slice_); return true; } @@ -100,7 +99,7 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { /// bytes that have already been returned by the last call of Next. /// So do the backup and have that ready for a later Next. void BackUp(int count) override { - GPR_CODEGEN_ASSERT(count <= static_cast(GRPC_SLICE_LENGTH(slice_))); + GPR_CODEGEN_ASSERT(count <= static_cast(GRPC_SLICE_LENGTH(*slice_))); backup_count_ = count; } @@ -135,14 +134,15 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { int64_t backup_count() { return backup_count_; } void set_backup_count(int64_t backup_count) { backup_count_ = backup_count; } grpc_byte_buffer_reader* reader() { return &reader_; } - grpc_slice* slice() { return &slice_; } + grpc_slice* slice() { return slice_; } + grpc_slice** mutable_slice_ptr() { return &slice_; } private: int64_t byte_count_; ///< total bytes read since object creation int64_t backup_count_; ///< how far backed up in the stream we are grpc_byte_buffer_reader reader_; ///< internal object to read \a grpc_slice ///< from the \a grpc_byte_buffer - grpc_slice slice_; ///< current slice passed back to the caller + grpc_slice* slice_; ///< current slice passed back to the caller Status status_; ///< status of the entire object }; diff --git a/src/core/lib/surface/byte_buffer_reader.cc b/src/core/lib/surface/byte_buffer_reader.cc index 1debc98ea0c..ed8ecc49590 100644 --- a/src/core/lib/surface/byte_buffer_reader.cc +++ b/src/core/lib/surface/byte_buffer_reader.cc @@ -91,6 +91,23 @@ void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader) { } } +int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader, + grpc_slice** slice) { + switch (reader->buffer_in->type) { + case GRPC_BB_RAW: { + grpc_slice_buffer* slice_buffer; + slice_buffer = &reader->buffer_out->data.raw.slice_buffer; + if (reader->current.index < slice_buffer->count) { + *slice = &slice_buffer->slices[reader->current.index]; + reader->current.index += 1; + return 1; + } + break; + } + } + return 0; +} + int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, grpc_slice* slice) { switch (reader->buffer_in->type) { diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index ab5f601fdd4..665305ca0a5 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -139,6 +139,11 @@ int CoreCodegen::grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, return ::grpc_byte_buffer_reader_next(reader, slice); } +int CoreCodegen::grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader, + grpc_slice** slice) { + return ::grpc_byte_buffer_reader_peek(reader, slice); +} + grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(grpc_slice* slice, size_t nslices) { return ::grpc_raw_byte_buffer_create(slice, nslices); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index fdbe0df4e52..f8a31286115 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -172,6 +172,7 @@ grpc_byte_buffer_destroy_type grpc_byte_buffer_destroy_import; grpc_byte_buffer_reader_init_type grpc_byte_buffer_reader_init_import; grpc_byte_buffer_reader_destroy_type grpc_byte_buffer_reader_destroy_import; grpc_byte_buffer_reader_next_type grpc_byte_buffer_reader_next_import; +grpc_byte_buffer_reader_peek_type grpc_byte_buffer_reader_peek_import; grpc_byte_buffer_reader_readall_type grpc_byte_buffer_reader_readall_import; grpc_raw_byte_buffer_from_reader_type grpc_raw_byte_buffer_from_reader_import; gpr_log_severity_string_type gpr_log_severity_string_import; @@ -440,6 +441,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_byte_buffer_reader_init_import = (grpc_byte_buffer_reader_init_type) GetProcAddress(library, "grpc_byte_buffer_reader_init"); grpc_byte_buffer_reader_destroy_import = (grpc_byte_buffer_reader_destroy_type) GetProcAddress(library, "grpc_byte_buffer_reader_destroy"); grpc_byte_buffer_reader_next_import = (grpc_byte_buffer_reader_next_type) GetProcAddress(library, "grpc_byte_buffer_reader_next"); + grpc_byte_buffer_reader_peek_import = (grpc_byte_buffer_reader_peek_type) GetProcAddress(library, "grpc_byte_buffer_reader_peek"); grpc_byte_buffer_reader_readall_import = (grpc_byte_buffer_reader_readall_type) GetProcAddress(library, "grpc_byte_buffer_reader_readall"); grpc_raw_byte_buffer_from_reader_import = (grpc_raw_byte_buffer_from_reader_type) GetProcAddress(library, "grpc_raw_byte_buffer_from_reader"); gpr_log_severity_string_import = (gpr_log_severity_string_type) GetProcAddress(library, "gpr_log_severity_string"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index cf16f0ca33b..275ca6e9cbf 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -491,6 +491,9 @@ extern grpc_byte_buffer_reader_destroy_type grpc_byte_buffer_reader_destroy_impo typedef int(*grpc_byte_buffer_reader_next_type)(grpc_byte_buffer_reader* reader, grpc_slice* slice); extern grpc_byte_buffer_reader_next_type grpc_byte_buffer_reader_next_import; #define grpc_byte_buffer_reader_next grpc_byte_buffer_reader_next_import +typedef int(*grpc_byte_buffer_reader_peek_type)(grpc_byte_buffer_reader* reader, grpc_slice** slice); +extern grpc_byte_buffer_reader_peek_type grpc_byte_buffer_reader_peek_import; +#define grpc_byte_buffer_reader_peek grpc_byte_buffer_reader_peek_import typedef grpc_slice(*grpc_byte_buffer_reader_readall_type)(grpc_byte_buffer_reader* reader); extern grpc_byte_buffer_reader_readall_type grpc_byte_buffer_reader_readall_import; #define grpc_byte_buffer_reader_readall grpc_byte_buffer_reader_readall_import diff --git a/test/core/surface/byte_buffer_reader_test.cc b/test/core/surface/byte_buffer_reader_test.cc index 301a1e283ba..bc368c49657 100644 --- a/test/core/surface/byte_buffer_reader_test.cc +++ b/test/core/surface/byte_buffer_reader_test.cc @@ -101,6 +101,73 @@ static void test_read_none_compressed_slice(void) { grpc_byte_buffer_destroy(buffer); } +static void test_peek_one_slice(void) { + grpc_slice slice; + grpc_byte_buffer* buffer; + grpc_byte_buffer_reader reader; + grpc_slice* first_slice; + grpc_slice* second_slice; + int first_code, second_code; + + LOG_TEST("test_peek_one_slice"); + slice = grpc_slice_from_copied_string("test"); + buffer = grpc_raw_byte_buffer_create(&slice, 1); + grpc_slice_unref(slice); + GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) && + "Couldn't init byte buffer reader"); + first_code = grpc_byte_buffer_reader_peek(&reader, &first_slice); + GPR_ASSERT(first_code != 0); + GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(*first_slice), "test", 4) == 0); + second_code = grpc_byte_buffer_reader_peek(&reader, &second_slice); + GPR_ASSERT(second_code == 0); + grpc_byte_buffer_destroy(buffer); +} + +static void test_peek_one_slice_malloc(void) { + grpc_slice slice; + grpc_byte_buffer* buffer; + grpc_byte_buffer_reader reader; + grpc_slice* first_slice; + grpc_slice* second_slice; + int first_code, second_code; + + LOG_TEST("test_peek_one_slice_malloc"); + slice = grpc_slice_malloc(4); + memcpy(GRPC_SLICE_START_PTR(slice), "test", 4); + buffer = grpc_raw_byte_buffer_create(&slice, 1); + grpc_slice_unref(slice); + GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) && + "Couldn't init byte buffer reader"); + first_code = grpc_byte_buffer_reader_peek(&reader, &first_slice); + GPR_ASSERT(first_code != 0); + GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(*first_slice), "test", 4) == 0); + second_code = grpc_byte_buffer_reader_peek(&reader, &second_slice); + GPR_ASSERT(second_code == 0); + grpc_byte_buffer_destroy(buffer); +} + +static void test_peek_none_compressed_slice(void) { + grpc_slice slice; + grpc_byte_buffer* buffer; + grpc_byte_buffer_reader reader; + grpc_slice* first_slice; + grpc_slice* second_slice; + int first_code, second_code; + + LOG_TEST("test_peek_none_compressed_slice"); + slice = grpc_slice_from_copied_string("test"); + buffer = grpc_raw_byte_buffer_create(&slice, 1); + grpc_slice_unref(slice); + GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) && + "Couldn't init byte buffer reader"); + first_code = grpc_byte_buffer_reader_peek(&reader, &first_slice); + GPR_ASSERT(first_code != 0); + GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(*first_slice), "test", 4) == 0); + second_code = grpc_byte_buffer_reader_peek(&reader, &second_slice); + GPR_ASSERT(second_code == 0); + grpc_byte_buffer_destroy(buffer); +} + static void test_read_corrupted_slice(void) { grpc_slice slice; grpc_byte_buffer* buffer; @@ -271,6 +338,9 @@ int main(int argc, char** argv) { test_read_one_slice(); test_read_one_slice_malloc(); test_read_none_compressed_slice(); + test_peek_one_slice(); + test_peek_one_slice_malloc(); + test_peek_none_compressed_slice(); test_read_gzip_compressed_slice(); test_read_deflate_compressed_slice(); test_read_corrupted_slice(); diff --git a/test/core/surface/public_headers_must_be_c89.c b/test/core/surface/public_headers_must_be_c89.c index 04d0506b3c2..fa02e76ec92 100644 --- a/test/core/surface/public_headers_must_be_c89.c +++ b/test/core/surface/public_headers_must_be_c89.c @@ -209,6 +209,7 @@ int main(int argc, char **argv) { printf("%lx", (unsigned long) grpc_byte_buffer_reader_init); printf("%lx", (unsigned long) grpc_byte_buffer_reader_destroy); printf("%lx", (unsigned long) grpc_byte_buffer_reader_next); + printf("%lx", (unsigned long) grpc_byte_buffer_reader_peek); printf("%lx", (unsigned long) grpc_byte_buffer_reader_readall); printf("%lx", (unsigned long) grpc_raw_byte_buffer_from_reader); printf("%lx", (unsigned long) gpr_log_severity_string); diff --git a/test/cpp/microbenchmarks/bm_byte_buffer.cc b/test/cpp/microbenchmarks/bm_byte_buffer.cc index a359e6f6212..644c27c4873 100644 --- a/test/cpp/microbenchmarks/bm_byte_buffer.cc +++ b/test/cpp/microbenchmarks/bm_byte_buffer.cc @@ -29,9 +29,8 @@ namespace grpc { namespace testing { -auto& force_library_initialization = Library::get(); - static void BM_ByteBuffer_Copy(benchmark::State& state) { + Library::get(); int num_slices = state.range(0); size_t slice_size = state.range(1); std::vector slices; @@ -48,6 +47,74 @@ static void BM_ByteBuffer_Copy(benchmark::State& state) { } BENCHMARK(BM_ByteBuffer_Copy)->Ranges({{1, 64}, {1, 1024 * 1024}}); +static void BM_ByteBufferReader_Next(benchmark::State& state) { + Library::get(); + const int num_slices = state.range(0); + constexpr size_t kSliceSize = 16; + std::vector slices; + for (int i = 0; i < num_slices; ++i) { + std::unique_ptr buf(new char[kSliceSize]); + slices.emplace_back(g_core_codegen_interface->grpc_slice_from_copied_buffer( + buf.get(), kSliceSize)); + } + grpc_byte_buffer* bb = g_core_codegen_interface->grpc_raw_byte_buffer_create( + slices.data(), num_slices); + grpc_byte_buffer_reader reader; + GPR_ASSERT( + g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb)); + while (state.KeepRunning()) { + grpc_slice* slice; + if (GPR_UNLIKELY(!g_core_codegen_interface->grpc_byte_buffer_reader_peek( + &reader, &slice))) { + g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader); + GPR_ASSERT( + g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb)); + continue; + } + } + + g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader); + g_core_codegen_interface->grpc_byte_buffer_destroy(bb); + for (auto& slice : slices) { + g_core_codegen_interface->grpc_slice_unref(slice); + } +} +BENCHMARK(BM_ByteBufferReader_Next)->Ranges({{64 * 1024, 1024 * 1024}}); + +static void BM_ByteBufferReader_Peek(benchmark::State& state) { + Library::get(); + const int num_slices = state.range(0); + constexpr size_t kSliceSize = 16; + std::vector slices; + for (int i = 0; i < num_slices; ++i) { + std::unique_ptr buf(new char[kSliceSize]); + slices.emplace_back(g_core_codegen_interface->grpc_slice_from_copied_buffer( + buf.get(), kSliceSize)); + } + grpc_byte_buffer* bb = g_core_codegen_interface->grpc_raw_byte_buffer_create( + slices.data(), num_slices); + grpc_byte_buffer_reader reader; + GPR_ASSERT( + g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb)); + while (state.KeepRunning()) { + grpc_slice* slice; + if (GPR_UNLIKELY(!g_core_codegen_interface->grpc_byte_buffer_reader_peek( + &reader, &slice))) { + g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader); + GPR_ASSERT( + g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb)); + continue; + } + } + + g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader); + g_core_codegen_interface->grpc_byte_buffer_destroy(bb); + for (auto& slice : slices) { + g_core_codegen_interface->grpc_slice_unref(slice); + } +} +BENCHMARK(BM_ByteBufferReader_Peek)->Ranges({{64 * 1024, 1024 * 1024}}); + } // namespace testing } // namespace grpc