Merge github.com:grpc/grpc into lfe3

pull/13349/head
Craig Tiller 7 years ago
commit 6def7103bf
  1. 33
      include/grpc++/impl/codegen/proto_utils.h
  2. 5
      include/grpc++/server_builder.h
  3. 28
      src/core/lib/iomgr/timer_generic.cc
  4. 110
      test/cpp/codegen/proto_utils_test.cc

@ -41,8 +41,11 @@ const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024;
class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
GrpcBufferWriter(grpc_byte_buffer** bp, int block_size, int total_size)
: block_size_(block_size),
total_size_(total_size),
byte_count_(0),
have_backup_(false) {
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.raw.slice_buffer;
}
@ -54,11 +57,20 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
}
bool Next(void** data, int* size) override {
// Protobuf should not ask for more memory than total_size_.
GPR_CODEGEN_ASSERT(byte_count_ < total_size_);
if (have_backup_) {
slice_ = backup_slice_;
have_backup_ = false;
} else {
slice_ = g_core_codegen_interface->grpc_slice_malloc(block_size_);
// When less than a whole block is needed, only allocate that much.
// But make sure the allocated slice is not inlined.
size_t remain = total_size_ - byte_count_ > block_size_
? block_size_
: total_size_ - byte_count_;
slice_ = g_core_codegen_interface->grpc_slice_malloc(
remain > GRPC_SLICE_INLINED_SIZE ? remain
: GRPC_SLICE_INLINED_SIZE + 1);
}
*data = GRPC_SLICE_START_PTR(slice_);
// On win x64, int is only 32bit
@ -70,7 +82,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
void BackUp(int count) override {
g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_);
if (count == block_size_) {
if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) {
backup_slice_ = slice_;
} else {
backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail(
@ -90,6 +102,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
protected:
friend class GrpcBufferWriterPeer;
const int block_size_;
const int total_size_;
int64_t byte_count_;
grpc_slice_buffer* slice_buffer_;
bool have_backup_;
@ -175,20 +188,20 @@ Status GenericSerialize(const grpc::protobuf::Message& msg,
"BufferWriter must be a subclass of io::ZeroCopyOutputStream");
*own_buffer = true;
int byte_size = msg.ByteSize();
if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
if ((size_t)byte_size <= GRPC_SLICE_INLINED_SIZE) {
grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
GPR_CODEGEN_ASSERT(
GRPC_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
g_core_codegen_interface->grpc_slice_unref(slice);
return g_core_codegen_interface->ok();
} else {
BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
// BufferReader must be a subclass of io::ZeroCopyInputStream.

@ -202,10 +202,7 @@ class ServerBuilder {
struct SyncServerSettings {
SyncServerSettings()
: num_cqs(GPR_MAX(1, gpr_cpu_num_cores())),
min_pollers(1),
max_pollers(2),
cq_timeout_msec(10000) {}
: num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
/// Number of server completion queues to create to listen to incoming RPCs.
int num_cqs;

@ -25,6 +25,7 @@
#include "src/core/lib/iomgr/timer.h"
#include <grpc/support/alloc.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
@ -37,8 +38,6 @@
#define INVALID_HEAP_INDEX 0xffffffffu
#define LOG2_NUM_SHARDS 5
#define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
#define ADD_DEADLINE_SCALE 0.33
#define MIN_QUEUE_WINDOW_DURATION 0.01
#define MAX_QUEUE_WINDOW_DURATION 1
@ -74,14 +73,16 @@ typedef struct {
grpc_timer list;
} timer_shard;
static size_t g_num_shards;
/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
* is hashed to select the timer shard to add the timer to */
static timer_shard g_shards[NUM_SHARDS];
static timer_shard* g_shards;
/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
* the deadline of the next timer in each shard).
* Access to this is protected by g_shared_mutables.mu */
static timer_shard* g_shard_queue[NUM_SHARDS];
static timer_shard** g_shard_queue;
#ifndef NDEBUG
@ -241,6 +242,11 @@ static gpr_atm compute_min_deadline(timer_shard* shard) {
void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) {
uint32_t i;
g_num_shards = GPR_MIN(1, 2 * gpr_cpu_num_cores());
g_shards = (timer_shard*)gpr_zalloc(g_num_shards * sizeof(*g_shards));
g_shard_queue =
(timer_shard**)gpr_zalloc(g_num_shards * sizeof(*g_shard_queue));
g_shared_mutables.initialized = true;
g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
gpr_mu_init(&g_shared_mutables.mu);
@ -250,7 +256,7 @@ void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) {
grpc_register_tracer(&grpc_timer_trace);
grpc_register_tracer(&grpc_timer_check_trace);
for (i = 0; i < NUM_SHARDS; i++) {
for (i = 0; i < g_num_shards; i++) {
timer_shard* shard = &g_shards[i];
gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
@ -267,17 +273,19 @@ void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) {
}
void grpc_timer_list_shutdown(grpc_exec_ctx* exec_ctx) {
int i;
size_t i;
run_some_expired_timers(
exec_ctx, GPR_ATM_MAX, nullptr,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
for (i = 0; i < NUM_SHARDS; i++) {
for (i = 0; i < g_num_shards; i++) {
timer_shard* shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
grpc_timer_heap_destroy(&shard->heap);
}
gpr_mu_destroy(&g_shared_mutables.mu);
gpr_tls_destroy(&g_last_seen_min_timer);
gpr_free(g_shards);
gpr_free(g_shard_queue);
g_shared_mutables.initialized = false;
}
@ -311,7 +319,7 @@ static void note_deadline_change(timer_shard* shard) {
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
}
while (shard->shard_queue_index < NUM_SHARDS - 1 &&
while (shard->shard_queue_index < g_num_shards - 1 &&
shard->min_deadline >
g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
swap_adjacent_shards_in_queue(shard->shard_queue_index);
@ -323,7 +331,7 @@ void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; }
void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer,
grpc_millis deadline, grpc_closure* closure) {
int is_first_timer = 0;
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
timer->closure = closure;
timer->deadline = deadline;
@ -417,7 +425,7 @@ void grpc_timer_cancel(grpc_exec_ctx* exec_ctx, grpc_timer* timer) {
return;
}
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
gpr_mu_lock(&shard->mu);
if (GRPC_TRACER_ON(grpc_timer_trace)) {
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,

@ -16,15 +16,16 @@
*
*/
#include <grpc++/impl/codegen/grpc_library.h>
#include <grpc++/impl/codegen/proto_utils.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc/impl/codegen/byte_buffer.h>
#include <grpc/slice.h>
#include <gtest/gtest.h>
namespace grpc {
namespace internal {
static GrpcLibraryInitializer g_gli_initializer;
// Provide access to GrpcBufferWriter internals.
class GrpcBufferWriterPeer {
public:
@ -44,35 +45,120 @@ class ProtoUtilsTest : public ::testing::Test {};
// GrpcBufferWriter Next()/Backup() invocations could result in a dangling
// pointer returned by Next() due to the interaction between grpc_slice inlining
// and GRPC_SLICE_START_PTR.
TEST_F(ProtoUtilsTest, BackupNext) {
// Ensure the GrpcBufferWriter internals are initialized.
g_gli_initializer.summon();
TEST_F(ProtoUtilsTest, TinyBackupThenNext) {
grpc_byte_buffer* bp;
GrpcBufferWriter writer(&bp, 8192);
const int block_size = 1024;
GrpcBufferWriter writer(&bp, block_size, 8192);
GrpcBufferWriterPeer peer(&writer);
void* data;
int size;
// Allocate a slice.
ASSERT_TRUE(writer.Next(&data, &size));
EXPECT_EQ(8192, size);
// Return a single byte. Before the fix that this test acts as a regression
// for, this would have resulted in an inlined backup slice.
EXPECT_EQ(block_size, size);
// Return a single byte.
writer.BackUp(1);
EXPECT_TRUE(!peer.have_backup());
// On the next allocation, the slice is non-inlined.
EXPECT_FALSE(peer.have_backup());
// On the next allocation, the returned slice is non-inlined.
ASSERT_TRUE(writer.Next(&data, &size));
EXPECT_TRUE(peer.slice().refcount != nullptr);
EXPECT_EQ(block_size, size);
// Cleanup.
g_core_codegen_interface->grpc_byte_buffer_destroy(bp);
}
namespace {
// Set backup_size to 0 to indicate no backup is needed.
void BufferWriterTest(int block_size, int total_size, int backup_size) {
grpc_byte_buffer* bp;
GrpcBufferWriter writer(&bp, block_size, total_size);
int written_size = 0;
void* data;
int size = 0;
bool backed_up_entire_slice = false;
while (written_size < total_size) {
EXPECT_TRUE(writer.Next(&data, &size));
EXPECT_GT(size, 0);
EXPECT_TRUE(data);
int write_size = size;
bool should_backup = false;
if (backup_size > 0 && size > backup_size) {
write_size = size - backup_size;
should_backup = true;
} else if (size == backup_size && !backed_up_entire_slice) {
// only backup entire slice once.
backed_up_entire_slice = true;
should_backup = true;
write_size = 0;
}
// May need a last backup.
if (write_size + written_size > total_size) {
write_size = total_size - written_size;
should_backup = true;
backup_size = size - write_size;
ASSERT_GT(backup_size, 0);
}
for (int i = 0; i < write_size; i++) {
((uint8_t*)data)[i] = written_size % 128;
written_size++;
}
if (should_backup) {
writer.BackUp(backup_size);
}
}
EXPECT_EQ(grpc_byte_buffer_length(bp), (size_t)total_size);
grpc_byte_buffer_reader reader;
grpc_byte_buffer_reader_init(&reader, bp);
int read_bytes = 0;
while (read_bytes < total_size) {
grpc_slice s;
EXPECT_TRUE(grpc_byte_buffer_reader_next(&reader, &s));
for (size_t i = 0; i < GRPC_SLICE_LENGTH(s); i++) {
EXPECT_EQ(GRPC_SLICE_START_PTR(s)[i], read_bytes % 128);
read_bytes++;
}
grpc_slice_unref(s);
}
EXPECT_EQ(read_bytes, total_size);
grpc_byte_buffer_reader_destroy(&reader);
grpc_byte_buffer_destroy(bp);
}
TEST(WriterTest, TinyBlockTinyBackup) {
for (int i = 2; i < (int)GRPC_SLICE_INLINED_SIZE; i++) {
BufferWriterTest(i, 256, 1);
}
}
TEST(WriterTest, SmallBlockTinyBackup) { BufferWriterTest(64, 256, 1); }
TEST(WriterTest, SmallBlockNoBackup) { BufferWriterTest(64, 256, 0); }
TEST(WriterTest, SmallBlockFullBackup) { BufferWriterTest(64, 256, 64); }
TEST(WriterTest, LargeBlockTinyBackup) { BufferWriterTest(4096, 8192, 1); }
TEST(WriterTest, LargeBlockNoBackup) { BufferWriterTest(4096, 8192, 0); }
TEST(WriterTest, LargeBlockFullBackup) { BufferWriterTest(4096, 8192, 4096); }
TEST(WriterTest, LargeBlockLargeBackup) { BufferWriterTest(4096, 8192, 4095); }
} // namespace
} // namespace internal
} // namespace grpc
int main(int argc, char** argv) {
// Ensure the GrpcBufferWriter internals are initialized.
grpc::internal::GrpcLibraryInitializer init;
init.summon();
grpc::GrpcLibraryCodegen lib;
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

Loading…
Cancel
Save