From 82f9886e3c89fedf70f91824a36f5404995da4b8 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 20 Jul 2017 10:24:54 -0700 Subject: [PATCH 1/5] Autosize shards for timers --- src/core/lib/iomgr/timer_generic.c | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 8ed03620419..155f730e64d 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -23,6 +23,7 @@ #include "src/core/lib/iomgr/timer.h" #include +#include #include #include #include @@ -35,8 +36,6 @@ #define INVALID_HEAP_INDEX 0xffffffffu -#define LOG2_NUM_SHARDS 5 -#define NUM_SHARDS (1 << LOG2_NUM_SHARDS) #define ADD_DEADLINE_SCALE 0.33 #define MIN_QUEUE_WINDOW_DURATION 0.01 #define MAX_QUEUE_WINDOW_DURATION 1 @@ -70,14 +69,16 @@ typedef struct { grpc_timer list; } timer_shard; +static size_t g_num_shards; + /* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address * is hashed to select the timer shard to add the timer to */ -static timer_shard g_shards[NUM_SHARDS]; +static timer_shard *g_shards; /* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e * the deadline of the next timer in each shard). * Access to this is protected by g_shared_mutables.mu */ -static timer_shard *g_shard_queue[NUM_SHARDS]; +static timer_shard **g_shard_queue; /* Thread local variable that stores the deadline of the next timer the thread * has last-seen. This is an optimization to prevent the thread from checking @@ -120,6 +121,10 @@ static gpr_atm compute_min_deadline(timer_shard *shard) { void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { uint32_t i; + g_num_shards = GPR_MIN(1, 2 * gpr_cpu_num_cores()); + g_shards = gpr_zalloc(g_num_shards * sizeof(*g_shards)); + g_shard_queue = gpr_zalloc(g_num_shards * sizeof(*g_shard_queue)); + g_shared_mutables.initialized = true; gpr_mu_init(&g_shared_mutables.mu); g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx); @@ -128,7 +133,7 @@ void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { grpc_register_tracer(&grpc_timer_trace); grpc_register_tracer(&grpc_timer_check_trace); - for (i = 0; i < NUM_SHARDS; i++) { + for (i = 0; i < g_num_shards; i++) { timer_shard *shard = &g_shards[i]; gpr_mu_init(&shard->mu); grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, @@ -143,17 +148,19 @@ void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { } void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { - int i; + size_t i; run_some_expired_timers( exec_ctx, GPR_ATM_MAX, NULL, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); - for (i = 0; i < NUM_SHARDS; i++) { + for (i = 0; i < g_num_shards; i++) { timer_shard *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); grpc_timer_heap_destroy(&shard->heap); } gpr_mu_destroy(&g_shared_mutables.mu); gpr_tls_destroy(&g_last_seen_min_timer); + gpr_free(g_shards); + gpr_free(g_shard_queue); g_shared_mutables.initialized = false; } @@ -187,7 +194,7 @@ static void note_deadline_change(timer_shard *shard) { g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { swap_adjacent_shards_in_queue(shard->shard_queue_index - 1); } - while (shard->shard_queue_index < NUM_SHARDS - 1 && + while (shard->shard_queue_index < g_num_shards - 1 && shard->min_deadline > g_shard_queue[shard->shard_queue_index + 1]->min_deadline) { swap_adjacent_shards_in_queue(shard->shard_queue_index); @@ -197,7 +204,7 @@ static void note_deadline_change(timer_shard *shard) { void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_millis deadline, grpc_closure *closure) { int is_first_timer = 0; - timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; + timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)]; timer->closure = closure; timer->deadline = deadline; @@ -283,7 +290,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { return; } - timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; + timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)]; gpr_mu_lock(&shard->mu); if (GRPC_TRACER_ON(grpc_timer_trace)) { gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, From 272eebbbcd74c03dbdf66f96121115bb7f9a2f32 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 8 Nov 2017 15:25:51 -0800 Subject: [PATCH 2/5] Only allocate what we need in the last slice for proto serialization --- include/grpc++/impl/codegen/proto_utils.h | 39 ++++---- test/cpp/codegen/proto_utils_test.cc | 110 +++++++++++++++++++--- 2 files changed, 119 insertions(+), 30 deletions(-) diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index 0f7e115c9af..b5ad3d8470c 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -41,8 +41,11 @@ const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024; class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { public: - explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) - : block_size_(block_size), byte_count_(0), have_backup_(false) { + GrpcBufferWriter(grpc_byte_buffer** bp, int block_size, int total_size) + : block_size_(block_size), + total_size_(total_size), + byte_count_(0), + have_backup_(false) { *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0); slice_buffer_ = &(*bp)->data.raw.slice_buffer; } @@ -54,11 +57,20 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { } bool Next(void** data, int* size) override { + // Protobuf should not ask for more memory than total_size_. + GPR_CODEGEN_ASSERT(byte_count_ < total_size_); if (have_backup_) { slice_ = backup_slice_; have_backup_ = false; } else { - slice_ = g_core_codegen_interface->grpc_slice_malloc(block_size_); + // When less than a whole block is needed, only allocate that much. + // But make sure the allocated slice is not inlined. + size_t remain = total_size_ - byte_count_ > block_size_ + ? block_size_ + : total_size_ - byte_count_; + slice_ = g_core_codegen_interface->grpc_slice_malloc( + remain > GRPC_SLICE_INLINED_SIZE ? remain + : GRPC_SLICE_INLINED_SIZE + 1); } *data = GRPC_SLICE_START_PTR(slice_); // On win x64, int is only 32bit @@ -70,7 +82,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { void BackUp(int count) override { g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_); - if (count == block_size_) { + if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) { backup_slice_ = slice_; } else { backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail( @@ -90,6 +102,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { protected: friend class GrpcBufferWriterPeer; const int block_size_; + const int total_size_; int64_t byte_count_; grpc_slice_buffer* slice_buffer_; bool have_backup_; @@ -175,20 +188,10 @@ Status GenericSerialize(const grpc::protobuf::Message& msg, "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); *own_buffer = true; int byte_size = msg.ByteSize(); - if (byte_size <= kGrpcBufferWriterMaxBufferLength) { - grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); - GPR_CODEGEN_ASSERT( - GRPC_SLICE_END_PTR(slice) == - msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); - *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); - g_core_codegen_interface->grpc_slice_unref(slice); - return g_core_codegen_interface->ok(); - } else { - BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength); - return msg.SerializeToZeroCopyStream(&writer) - ? g_core_codegen_interface->ok() - : Status(StatusCode::INTERNAL, "Failed to serialize message"); - } + BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size); + return msg.SerializeToZeroCopyStream(&writer) + ? g_core_codegen_interface->ok() + : Status(StatusCode::INTERNAL, "Failed to serialize message"); } // BufferReader must be a subclass of io::ZeroCopyInputStream. diff --git a/test/cpp/codegen/proto_utils_test.cc b/test/cpp/codegen/proto_utils_test.cc index fd05c90e9dc..ba89b299ef3 100644 --- a/test/cpp/codegen/proto_utils_test.cc +++ b/test/cpp/codegen/proto_utils_test.cc @@ -16,15 +16,16 @@ * */ +#include #include #include +#include +#include #include namespace grpc { namespace internal { -static GrpcLibraryInitializer g_gli_initializer; - // Provide access to GrpcBufferWriter internals. class GrpcBufferWriterPeer { public: @@ -44,35 +45,120 @@ class ProtoUtilsTest : public ::testing::Test {}; // GrpcBufferWriter Next()/Backup() invocations could result in a dangling // pointer returned by Next() due to the interaction between grpc_slice inlining // and GRPC_SLICE_START_PTR. -TEST_F(ProtoUtilsTest, BackupNext) { - // Ensure the GrpcBufferWriter internals are initialized. - g_gli_initializer.summon(); - +TEST_F(ProtoUtilsTest, TinyBackupThenNext) { grpc_byte_buffer* bp; - GrpcBufferWriter writer(&bp, 8192); + const int block_size = 1024; + GrpcBufferWriter writer(&bp, block_size, 8192); GrpcBufferWriterPeer peer(&writer); void* data; int size; // Allocate a slice. ASSERT_TRUE(writer.Next(&data, &size)); - EXPECT_EQ(8192, size); - // Return a single byte. Before the fix that this test acts as a regression - // for, this would have resulted in an inlined backup slice. + EXPECT_EQ(block_size, size); + // Return a single byte. writer.BackUp(1); - EXPECT_TRUE(!peer.have_backup()); - // On the next allocation, the slice is non-inlined. + EXPECT_FALSE(peer.have_backup()); + // On the next allocation, the returned slice is non-inlined. ASSERT_TRUE(writer.Next(&data, &size)); EXPECT_TRUE(peer.slice().refcount != NULL); + EXPECT_EQ(block_size, size); // Cleanup. g_core_codegen_interface->grpc_byte_buffer_destroy(bp); } +namespace { + +// Set backup_size to 0 to indicate no backup is needed. +void BufferWriterTest(int block_size, int total_size, int backup_size) { + grpc_byte_buffer* bp; + GrpcBufferWriter writer(&bp, block_size, total_size); + + int written_size = 0; + void* data; + int size = 0; + bool backed_up_entire_slice = false; + + while (written_size < total_size) { + EXPECT_TRUE(writer.Next(&data, &size)); + EXPECT_GT(size, 0); + EXPECT_TRUE(data); + int write_size = size; + bool should_backup = false; + if (backup_size > 0 && size > backup_size) { + write_size = size - backup_size; + should_backup = true; + } else if (size == backup_size && !backed_up_entire_slice) { + // only backup entire slice once. + backed_up_entire_slice = true; + should_backup = true; + write_size = 0; + } + // May need a last backup. + if (write_size + written_size > total_size) { + write_size = total_size - written_size; + should_backup = true; + backup_size = size - write_size; + ASSERT_GT(backup_size, 0); + } + for (int i = 0; i < write_size; i++) { + ((uint8_t*)data)[i] = written_size % 128; + written_size++; + } + if (should_backup) { + writer.BackUp(backup_size); + } + } + EXPECT_EQ(grpc_byte_buffer_length(bp), (size_t)total_size); + + grpc_byte_buffer_reader reader; + grpc_byte_buffer_reader_init(&reader, bp); + int read_bytes = 0; + while (read_bytes < total_size) { + grpc_slice s; + EXPECT_TRUE(grpc_byte_buffer_reader_next(&reader, &s)); + for (size_t i = 0; i < GRPC_SLICE_LENGTH(s); i++) { + EXPECT_EQ(GRPC_SLICE_START_PTR(s)[i], read_bytes % 128); + read_bytes++; + } + grpc_slice_unref(s); + } + EXPECT_EQ(read_bytes, total_size); + grpc_byte_buffer_reader_destroy(&reader); + grpc_byte_buffer_destroy(bp); +} + +TEST(WriterTest, TinyBlockTinyBackup) { + for (int i = 2; i < (int)GRPC_SLICE_INLINED_SIZE; i++) { + BufferWriterTest(i, 256, 1); + } +} + +TEST(WriterTest, SmallBlockTinyBackup) { BufferWriterTest(64, 256, 1); } + +TEST(WriterTest, SmallBlockNoBackup) { BufferWriterTest(64, 256, 0); } + +TEST(WriterTest, SmallBlockFullBackup) { BufferWriterTest(64, 256, 64); } + +TEST(WriterTest, LargeBlockTinyBackup) { BufferWriterTest(4096, 8192, 1); } + +TEST(WriterTest, LargeBlockNoBackup) { BufferWriterTest(4096, 8192, 0); } + +TEST(WriterTest, LargeBlockFullBackup) { BufferWriterTest(4096, 8192, 4096); } + +TEST(WriterTest, LargeBlockLargeBackup) { BufferWriterTest(4096, 8192, 4095); } + +} // namespace } // namespace internal } // namespace grpc int main(int argc, char** argv) { + // Ensure the GrpcBufferWriter internals are initialized. + grpc::internal::GrpcLibraryInitializer init; + init.summon(); + grpc::GrpcLibraryCodegen lib; + ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } From c88185900d5deb54fe02c3cdde3fc9359e185287 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 8 Nov 2017 16:33:49 -0800 Subject: [PATCH 3/5] handle 0 byte size message... --- include/grpc++/impl/codegen/proto_utils.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index b5ad3d8470c..799d236e0d3 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -188,6 +188,11 @@ Status GenericSerialize(const grpc::protobuf::Message& msg, "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); *own_buffer = true; int byte_size = msg.ByteSize(); + if (byte_size == 0) { + grpc_slice empty_slice = g_core_codegen_interface->grpc_empty_slice(); + *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&empty_slice, 1); + return g_core_codegen_interface->ok(); + } BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size); return msg.SerializeToZeroCopyStream(&writer) ? g_core_codegen_interface->ok() From b90f0e66e503a8fc675c1cdf3a4301b28a27d62e Mon Sep 17 00:00:00 2001 From: yang-g Date: Thu, 9 Nov 2017 09:51:29 -0800 Subject: [PATCH 4/5] relax for inlined bytes to avoid allocation --- include/grpc++/impl/codegen/proto_utils.h | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index 799d236e0d3..b7636034d4f 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -188,9 +188,14 @@ Status GenericSerialize(const grpc::protobuf::Message& msg, "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); *own_buffer = true; int byte_size = msg.ByteSize(); - if (byte_size == 0) { - grpc_slice empty_slice = g_core_codegen_interface->grpc_empty_slice(); - *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&empty_slice, 1); + if ((size_t)byte_size <= GRPC_SLICE_INLINED_SIZE) { + grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); + GPR_CODEGEN_ASSERT( + GRPC_SLICE_END_PTR(slice) == + msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); + *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); + g_core_codegen_interface->grpc_slice_unref(slice); + return g_core_codegen_interface->ok(); } BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size); From fcfa440b206f86566e25b8a83137c71d8b57fbcb Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Thu, 9 Nov 2017 16:45:03 -0800 Subject: [PATCH 5/5] Default to 1 cq per sync server --- include/grpc++/server_builder.h | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index bf842baf6fc..0888bef0d95 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -202,10 +202,7 @@ class ServerBuilder { struct SyncServerSettings { SyncServerSettings() - : num_cqs(GPR_MAX(1, gpr_cpu_num_cores())), - min_pollers(1), - max_pollers(2), - cq_timeout_msec(10000) {} + : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} /// Number of server completion queues to create to listen to incoming RPCs. int num_cqs;