Merge pull request #14845 from markdroth/inlined_vector

Change InlinedVector to keep elements stored contiguously.
pull/14993/head
Mark D. Roth 7 years ago committed by GitHub
commit 03f01fd9ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      src/core/ext/filters/client_channel/client_channel.cc
  2. 64
      src/core/lib/gprpp/inlined_vector.h
  3. 1
      test/core/gprpp/inlined_vector_test.cc

@ -924,7 +924,9 @@ typedef struct client_channel_call_data {
// Note: We inline the cache for the first 3 send_message ops and use // Note: We inline the cache for the first 3 send_message ops and use
// dynamic allocation after that. This number was essentially picked // dynamic allocation after that. This number was essentially picked
// at random; it could be changed in the future to tune performance. // at random; it could be changed in the future to tune performance.
grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages; grpc_core::ManualConstructor<
grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>>
send_messages;
// send_trailing_metadata // send_trailing_metadata
bool seen_send_trailing_metadata; bool seen_send_trailing_metadata;
grpc_linked_mdelem* send_trailing_metadata_storage; grpc_linked_mdelem* send_trailing_metadata_storage;
@ -974,7 +976,7 @@ static void maybe_cache_send_ops_for_batch(call_data* calld,
gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
new (cache) grpc_core::ByteStreamCache( new (cache) grpc_core::ByteStreamCache(
std::move(batch->payload->send_message.send_message)); std::move(batch->payload->send_message.send_message));
calld->send_messages.push_back(cache); calld->send_messages->push_back(cache);
} }
// Save metadata batch for send_trailing_metadata ops. // Save metadata batch for send_trailing_metadata ops.
if (batch->send_trailing_metadata) { if (batch->send_trailing_metadata) {
@ -1008,7 +1010,7 @@ static void free_cached_send_op_data_after_commit(
"]", "]",
chand, calld, i); chand, calld, i);
} }
calld->send_messages[i]->Destroy(); (*calld->send_messages)[i]->Destroy();
} }
if (retry_state->completed_send_trailing_metadata) { if (retry_state->completed_send_trailing_metadata) {
grpc_metadata_batch_destroy(&calld->send_trailing_metadata); grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
@ -1032,7 +1034,7 @@ static void free_cached_send_op_data_for_completed_batch(
"]", "]",
chand, calld, retry_state->completed_send_message_count - 1); chand, calld, retry_state->completed_send_message_count - 1);
} }
calld->send_messages[retry_state->completed_send_message_count - 1] (*calld->send_messages)[retry_state->completed_send_message_count - 1]
->Destroy(); ->Destroy();
} }
if (batch_data->batch.send_trailing_metadata) { if (batch_data->batch.send_trailing_metadata) {
@ -1280,7 +1282,8 @@ static bool pending_batch_is_completed(
return false; return false;
} }
if (pending->batch->send_message && if (pending->batch->send_message &&
retry_state->completed_send_message_count < calld->send_messages.size()) { retry_state->completed_send_message_count <
calld->send_messages->size()) {
return false; return false;
} }
if (pending->batch->send_trailing_metadata && if (pending->batch->send_trailing_metadata &&
@ -1315,7 +1318,7 @@ static bool pending_batch_is_unstarted(
return true; return true;
} }
if (pending->batch->send_message && if (pending->batch->send_message &&
retry_state->started_send_message_count < calld->send_messages.size()) { retry_state->started_send_message_count < calld->send_messages->size()) {
return true; return true;
} }
if (pending->batch->send_trailing_metadata && if (pending->batch->send_trailing_metadata &&
@ -1817,7 +1820,7 @@ static void add_closures_for_replay_or_pending_send_ops(
channel_data* chand = static_cast<channel_data*>(elem->channel_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data); call_data* calld = static_cast<call_data*>(elem->call_data);
bool have_pending_send_message_ops = bool have_pending_send_message_ops =
retry_state->started_send_message_count < calld->send_messages.size(); retry_state->started_send_message_count < calld->send_messages->size();
bool have_pending_send_trailing_metadata_op = bool have_pending_send_trailing_metadata_op =
calld->seen_send_trailing_metadata && calld->seen_send_trailing_metadata &&
!retry_state->started_send_trailing_metadata; !retry_state->started_send_trailing_metadata;
@ -2133,7 +2136,7 @@ static void add_retriable_send_message_op(
chand, calld, retry_state->started_send_message_count); chand, calld, retry_state->started_send_message_count);
} }
grpc_core::ByteStreamCache* cache = grpc_core::ByteStreamCache* cache =
calld->send_messages[retry_state->started_send_message_count]; (*calld->send_messages)[retry_state->started_send_message_count];
++retry_state->started_send_message_count; ++retry_state->started_send_message_count;
batch_data->send_message.Init(cache); batch_data->send_message.Init(cache);
batch_data->batch.send_message = true; batch_data->batch.send_message = true;
@ -2254,7 +2257,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
} }
// send_message. // send_message.
// Note that we can only have one send_message op in flight at a time. // Note that we can only have one send_message op in flight at a time.
if (retry_state->started_send_message_count < calld->send_messages.size() && if (retry_state->started_send_message_count < calld->send_messages->size() &&
retry_state->started_send_message_count == retry_state->started_send_message_count ==
retry_state->completed_send_message_count && retry_state->completed_send_message_count &&
!calld->pending_send_message) { !calld->pending_send_message) {
@ -2274,7 +2277,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
// to start, since we can't send down any more send_message ops after // to start, since we can't send down any more send_message ops after
// send_trailing_metadata. // send_trailing_metadata.
if (calld->seen_send_trailing_metadata && if (calld->seen_send_trailing_metadata &&
retry_state->started_send_message_count == calld->send_messages.size() && retry_state->started_send_message_count == calld->send_messages->size() &&
!retry_state->started_send_trailing_metadata && !retry_state->started_send_trailing_metadata &&
!calld->pending_send_trailing_metadata) { !calld->pending_send_trailing_metadata) {
if (grpc_client_channel_trace.enabled()) { if (grpc_client_channel_trace.enabled()) {
@ -2325,7 +2328,7 @@ static void add_subchannel_batches_for_pending_batches(
// send_message ops after send_trailing_metadata. // send_message ops after send_trailing_metadata.
if (batch->send_trailing_metadata && if (batch->send_trailing_metadata &&
(retry_state->started_send_message_count + batch->send_message < (retry_state->started_send_message_count + batch->send_message <
calld->send_messages.size() || calld->send_messages->size() ||
retry_state->started_send_trailing_metadata)) { retry_state->started_send_trailing_metadata)) {
continue; continue;
} }
@ -2976,6 +2979,7 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem,
calld->deadline); calld->deadline);
} }
calld->enable_retries = chand->enable_retries; calld->enable_retries = chand->enable_retries;
calld->send_messages.Init();
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
@ -3011,6 +3015,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem,
calld->pick.subchannel_call_context[i].value); calld->pick.subchannel_call_context[i].value);
} }
} }
calld->send_messages.Destroy();
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
} }

@ -54,43 +54,43 @@ class InlinedVector {
InlinedVector(const InlinedVector&) = delete; InlinedVector(const InlinedVector&) = delete;
InlinedVector& operator=(const InlinedVector&) = delete; InlinedVector& operator=(const InlinedVector&) = delete;
T* data() {
return dynamic_ != nullptr ? dynamic_ : reinterpret_cast<T*>(inline_);
}
const T* data() const {
return dynamic_ != nullptr ? dynamic_ : reinterpret_cast<const T*>(inline_);
}
T& operator[](size_t offset) { T& operator[](size_t offset) {
assert(offset < size_); assert(offset < size_);
if (offset < N) { return data()[offset];
return *reinterpret_cast<T*>(inline_ + offset);
} else {
return dynamic_[offset - N];
}
} }
const T& operator[](size_t offset) const { const T& operator[](size_t offset) const {
assert(offset < size_); assert(offset < size_);
if (offset < N) { return data()[offset];
return *reinterpret_cast<const T*>(inline_ + offset); }
} else {
return dynamic_[offset - N]; void reserve(size_t capacity) {
if (capacity > capacity_) {
T* new_dynamic = static_cast<T*>(gpr_malloc(sizeof(T) * capacity));
for (size_t i = 0; i < size_; ++i) {
new (&new_dynamic[i]) T(std::move(data()[i]));
data()[i].~T();
}
gpr_free(dynamic_);
dynamic_ = new_dynamic;
capacity_ = capacity;
} }
} }
template <typename... Args> template <typename... Args>
void emplace_back(Args&&... args) { void emplace_back(Args&&... args) {
if (size_ < N) { if (size_ == capacity_) {
new (&inline_[size_]) T(std::forward<Args>(args)...); reserve(capacity_ * 2);
} else {
if (size_ - N == dynamic_capacity_) {
size_t new_capacity =
dynamic_capacity_ == 0 ? 2 : dynamic_capacity_ * 2;
T* new_dynamic = static_cast<T*>(gpr_malloc(sizeof(T) * new_capacity));
for (size_t i = 0; i < dynamic_capacity_; ++i) {
new (&new_dynamic[i]) T(std::move(dynamic_[i]));
dynamic_[i].~T();
}
gpr_free(dynamic_);
dynamic_ = new_dynamic;
dynamic_capacity_ = new_capacity;
}
new (&dynamic_[size_ - N]) T(std::forward<Args>(args)...);
} }
new (&(data()[size_])) T(std::forward<Args>(args)...);
++size_; ++size_;
} }
@ -99,6 +99,7 @@ class InlinedVector {
void push_back(T&& value) { emplace_back(std::move(value)); } void push_back(T&& value) { emplace_back(std::move(value)); }
size_t size() const { return size_; } size_t size() const { return size_; }
size_t capacity() const { return capacity_; }
void clear() { void clear() {
destroy_elements(); destroy_elements();
@ -109,26 +110,21 @@ class InlinedVector {
void init_data() { void init_data() {
dynamic_ = nullptr; dynamic_ = nullptr;
size_ = 0; size_ = 0;
dynamic_capacity_ = 0; capacity_ = N;
} }
void destroy_elements() { void destroy_elements() {
for (size_t i = 0; i < size_ && i < N; ++i) { for (size_t i = 0; i < size_; ++i) {
T& value = *reinterpret_cast<T*>(inline_ + i); T& value = data()[i];
value.~T(); value.~T();
} }
if (size_ > N) { // Avoid subtracting two signed values.
for (size_t i = 0; i < size_ - N; ++i) {
dynamic_[i].~T();
}
}
gpr_free(dynamic_); gpr_free(dynamic_);
} }
typename std::aligned_storage<sizeof(T)>::type inline_[N]; typename std::aligned_storage<sizeof(T)>::type inline_[N];
T* dynamic_; T* dynamic_;
size_t size_; size_t size_;
size_t dynamic_capacity_; size_t capacity_;
}; };
} // namespace grpc_core } // namespace grpc_core

@ -33,6 +33,7 @@ TEST(InlinedVectorTest, CreateAndIterate) {
EXPECT_EQ(static_cast<size_t>(kNumElements), v.size()); EXPECT_EQ(static_cast<size_t>(kNumElements), v.size());
for (int i = 0; i < kNumElements; ++i) { for (int i = 0; i < kNumElements; ++i) {
EXPECT_EQ(i, v[i]); EXPECT_EQ(i, v[i]);
EXPECT_EQ(i, &v[i] - &v[0]); // Ensure contiguous allocation.
} }
} }

Loading…
Cancel
Save