Add LIFO and Chunked List

pull/19696/head
Yunjia Wang 6 years ago
parent 1e607d3dc0
commit f50301fde8
  1. 120
      src/core/lib/iomgr/executor/mpmcqueue.cc
  2. 68
      src/core/lib/iomgr/executor/mpmcqueue.h
  3. 45
      test/core/iomgr/mpmcqueue_test.cc

@ -26,18 +26,15 @@ DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool");
inline void* InfLenFIFOQueue::PopFront() { inline void* InfLenFIFOQueue::PopFront() {
// Caller should already check queue is not empty and has already held the // Caller should already check queue is not empty and has already held the
// mutex. This function will only do the job of removal. // mutex. This function will assume that there is at least one element in the
// queue (aka queue_head_ content is valid).
void* result = queue_head_->content; void* result = queue_head_->content;
Node* head_to_remove = queue_head_;
queue_head_ = queue_head_->next;
count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED); count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED);
// Updates Stats when trace flag turned on.
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
gpr_timespec wait_time = gpr_timespec wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), head_to_remove->insert_time); queue_head_->insert_time);
// Updates Stats info
stats_.num_completed++; stats_.num_completed++;
stats_.total_queue_time = gpr_time_add(stats_.total_queue_time, wait_time); stats_.total_queue_time = gpr_time_add(stats_.total_queue_time, wait_time);
stats_.max_queue_time = gpr_time_max( stats_.max_queue_time = gpr_time_max(
@ -58,44 +55,87 @@ inline void* InfLenFIFOQueue::PopFront() {
gpr_timespec_to_micros(stats_.busy_queue_time)); gpr_timespec_to_micros(stats_.busy_queue_time));
} }
Delete(head_to_remove); queue_head_ = queue_head_->next;
// Signal waiting thread // Signal waiting thread
if (count_.Load(MemoryOrder::RELAXED) > 0 && num_waiters_ > 0) { if (count_.Load(MemoryOrder::RELAXED) > 0) {
wait_nonempty_.Signal(); TopWaiter()->cv.Signal();
} }
return result; return result;
} }
InfLenFIFOQueue::Node* InfLenFIFOQueue::AllocateNodes(int num) {
Node* new_chunk = static_cast<Node*>(gpr_zalloc(sizeof(Node) * num));
new_chunk[0].next = &new_chunk[1];
new_chunk[num - 1].prev = &new_chunk[num - 2];
for (int i = 1; i < num - 1; ++i) {
new_chunk[i].prev = &new_chunk[i - 1];
new_chunk[i].next = &new_chunk[i + 1];
}
return new_chunk;
}
InfLenFIFOQueue::InfLenFIFOQueue() {
delete_list_size_ = 1024;
delete_list_ =
static_cast<Node**>(gpr_zalloc(sizeof(Node*) * delete_list_size_));
Node* new_chunk = AllocateNodes(1024);
delete_list_[delete_list_count_++] = new_chunk;
queue_head_ = queue_tail_ = new_chunk;
new_chunk[0].prev = &new_chunk[1023];
new_chunk[1023].next = &new_chunk[0];
waiters_.next = &waiters_;
waiters_.prev = &waiters_;
}
InfLenFIFOQueue::~InfLenFIFOQueue() { InfLenFIFOQueue::~InfLenFIFOQueue() {
GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) == 0); GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) == 0);
GPR_ASSERT(num_waiters_ == 0); for (size_t i = 0; i < delete_list_count_; ++i) {
gpr_free(delete_list_[i]);
}
gpr_free(delete_list_);
} }
void InfLenFIFOQueue::Put(void* elem) { void InfLenFIFOQueue::Put(void* elem) {
MutexLock l(&mu_); MutexLock l(&mu_);
Node* new_node = New<Node>(elem); int curr_count = count_.Load(MemoryOrder::RELAXED);
if (count_.Load(MemoryOrder::RELAXED) == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { if (queue_tail_ == queue_head_ && curr_count!= 0) {
busy_time = gpr_now(GPR_CLOCK_MONOTONIC); // List is full. Expands list to double size by inserting new chunk of nodes
Node* new_chunk = AllocateNodes(curr_count);
delete_list_[delete_list_count_++] = new_chunk;
// Expands delete list on full.
if (delete_list_count_ == delete_list_size_) {
delete_list_size_ = delete_list_size_ * 2;
delete_list_ = static_cast<Node**>(
gpr_realloc(delete_list_, sizeof(Node*) * delete_list_size_));
} }
queue_head_ = queue_tail_ = new_node; new_chunk[0].prev = queue_tail_->prev;
} else { new_chunk[curr_count - 1].next = queue_head_;
queue_tail_->next = new_node; queue_tail_->prev->next = new_chunk;
queue_tail_ = queue_tail_->next; queue_head_->prev = &new_chunk[curr_count - 1];
queue_tail_ = new_chunk;
} }
count_.Store(count_.Load(MemoryOrder::RELAXED) + 1, MemoryOrder::RELAXED); queue_tail_->content = static_cast<void*>(elem);
// Updates Stats info // Updates Stats info
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
stats_.num_started++; stats_.num_started++;
gpr_log(GPR_INFO, "[InfLenFIFOQueue Put] num_started: %" PRIu64, gpr_log(GPR_INFO, "[InfLenFIFOQueue Put] num_started: %" PRIu64,
stats_.num_started); stats_.num_started);
if (curr_count == 0) {
busy_time = gpr_now(GPR_CLOCK_MONOTONIC);
} }
queue_tail_->insert_time = gpr_now(GPR_CLOCK_MONOTONIC);
if (num_waiters_ > 0) {
wait_nonempty_.Signal();
} }
count_.Store(curr_count + 1, MemoryOrder::RELAXED);
queue_tail_ = queue_tail_->next;
TopWaiter()->cv.Signal();
} }
void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) { void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
@ -108,11 +148,12 @@ void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
start_time = gpr_now(GPR_CLOCK_MONOTONIC); start_time = gpr_now(GPR_CLOCK_MONOTONIC);
} }
num_waiters_++; Waiter self;
PushWaiter(&self);
do { do {
wait_nonempty_.Wait(&mu_); self.cv.Wait(&mu_);
} while (count_.Load(MemoryOrder::RELAXED) == 0); } while (count_.Load(MemoryOrder::RELAXED) == 0);
num_waiters_--; RemoveWaiter(&self);
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) && if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
wait_time != nullptr) { wait_time != nullptr) {
*wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time); *wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time);
@ -122,4 +163,29 @@ void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
return PopFront(); return PopFront();
} }
size_t InfLenFIFOQueue::num_node() {
size_t num = 1024;
for (size_t i = 1; i < delete_list_count_; ++i) {
num = num* 2;
}
return num;
}
void InfLenFIFOQueue::PushWaiter(Waiter* waiter) {
waiter->next = waiters_.next;
waiter->prev = &waiters_;
waiter->next->prev = waiter;
waiter->prev->next = waiter;
}
void InfLenFIFOQueue::RemoveWaiter(Waiter* waiter) {
GPR_DEBUG_ASSERT(waiter != &waiters_);
waiter->next->prev = waiter->prev;
waiter->prev->next = waiter->next;
}
InfLenFIFOQueue::Waiter* InfLenFIFOQueue::TopWaiter() {
return waiters_.next;
}
} // namespace grpc_core } // namespace grpc_core

@ -54,7 +54,7 @@ class MPMCQueueInterface {
class InfLenFIFOQueue : public MPMCQueueInterface { class InfLenFIFOQueue : public MPMCQueueInterface {
public: public:
// Creates a new MPMC Queue. The queue created will have infinite length. // Creates a new MPMC Queue. The queue created will have infinite length.
InfLenFIFOQueue() {} InfLenFIFOQueue();
// Releases all resources held by the queue. The queue must be empty, and no // Releases all resources held by the queue. The queue must be empty, and no
// one waits on conditional variables. // one waits on conditional variables.
@ -66,8 +66,8 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
// Removes the oldest element from the queue and returns it. // Removes the oldest element from the queue and returns it.
// This routine will cause the thread to block if queue is currently empty. // This routine will cause the thread to block if queue is currently empty.
// Argument wait_time should be passed in when turning on the trace flag // Argument wait_time should be passed in when trace flag turning on (for
// grpc_thread_pool_trace (for collecting stats info purpose.) // collecting stats info purpose.)
void* Get(gpr_timespec* wait_time = nullptr); void* Get(gpr_timespec* wait_time = nullptr);
// Returns number of elements in queue currently. // Returns number of elements in queue currently.
@ -75,24 +75,30 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
// quickly. // quickly.
int count() const { return count_.Load(MemoryOrder::RELAXED); } int count() const { return count_.Load(MemoryOrder::RELAXED); }
private:
// For Internal Use Only.
// Removes the oldest element from the queue and returns it. This routine
// will NOT check whether queue is empty, and it will NOT acquire mutex.
// Caller should do the check and acquire mutex before callling.
void* PopFront();
struct Node { struct Node {
Node* next; // Linking Node* next; // Linking
Node* prev;
void* content; // Points to actual element void* content; // Points to actual element
gpr_timespec insert_time; // Time for stats gpr_timespec insert_time; // Time for stats
Node(void* c) : content(c) { Node() {
next = nullptr; next = prev = nullptr;
insert_time = gpr_now(GPR_CLOCK_MONOTONIC); content = nullptr;
} }
}; };
// For test purpose only. Returns number of nodes allocated in queue.
// All allocated nodes will not be free until destruction of queue.
size_t num_node();
private:
// For Internal Use Only.
// Removes the oldest element from the queue and returns it. This routine
// will NOT check whether queue is empty, and it will NOT acquire mutex.
// Caller MUST check that queue is not empty and must acquire mutex before
// callling.
void* PopFront();
// Stats of queue. This will only be collect when debug trace mode is on. // Stats of queue. This will only be collect when debug trace mode is on.
// All printed stats info will have time measurement in microsecond. // All printed stats info will have time measurement in microsecond.
struct Stats { struct Stats {
@ -115,15 +121,47 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
} }
}; };
// Node for waiting thread queue. Stands for one waiting thread, should have
// exact one thread waiting on its CondVar.
// Using a doubly linked list for waiting thread queue to wake up waiting
// threads in LIFO order to reduce cache misses.
struct Waiter {
CondVar cv;
Waiter* next;
Waiter* prev;
};
// Pushs waiter to the front of queue, require caller held mutex
void PushWaiter(Waiter* waiter);
// Removes waiter from queue, require caller held mutex
void RemoveWaiter(Waiter* waiter);
// Returns pointer to the waiter that should be waken up next, should be the
// last added waiter.
Waiter* TopWaiter();
Mutex mu_; // Protecting lock Mutex mu_; // Protecting lock
CondVar wait_nonempty_; // Wait on empty queue on get Waiter waiters_; // Head of waiting thread queue
int num_waiters_ = 0; // Number of waiters
Node** delete_list_ = nullptr; // Keeps track of all allocated array entries
// for deleting on destruction
size_t delete_list_count_ = 0; // Number of entries in list
size_t delete_list_size_ = 0; // Size of the list. List will be expanded to
// double size on full
Node* queue_head_ = nullptr; // Head of the queue, remove position Node* queue_head_ = nullptr; // Head of the queue, remove position
Node* queue_tail_ = nullptr; // End of queue, insert position Node* queue_tail_ = nullptr; // End of queue, insert position
Atomic<int> count_{0}; // Number of elements in queue Atomic<int> count_{0}; // Number of elements in queue
Stats stats_; // Stats info Stats stats_; // Stats info
gpr_timespec busy_time; // Start time of busy queue gpr_timespec busy_time; // Start time of busy queue
// Internal Helper.
// Allocates an array of nodes of size "num", links all nodes together except
// the first node's prev and last node's next. They should be set by caller
// manually afterward.
Node* AllocateNodes(int num);
}; };
} // namespace grpc_core } // namespace grpc_core

@ -119,6 +119,50 @@ static void test_FIFO(void) {
} }
} }
static void test_space_efficiency(void) {
gpr_log(GPR_INFO, "test_space_efficiency");
grpc_core::InfLenFIFOQueue queue;
for (int i = 0; i < 1024; ++i) {
queue.Put(static_cast<void*>(grpc_core::New<WorkItem>(i)));
}
GPR_ASSERT(queue.num_node() == 1024);
for (int i = 0; i < 1024; ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get());
queue.Put(item);
}
GPR_ASSERT(queue.num_node() == 1024);
for (int i = 0; i < 1024; ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get());
grpc_core::Delete(item);
}
GPR_ASSERT(queue.num_node() == 1024);
GPR_ASSERT(queue.count() == 0);
// queue empty now
for (int i = 0; i < 4000; ++i) {
queue.Put(static_cast<void*>(grpc_core::New<WorkItem>(i)));
}
GPR_ASSERT(queue.count() == 4000);
GPR_ASSERT(queue.num_node() == 4096);
for (int i = 0; i < 2000; ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get());
grpc_core::Delete(item);
}
GPR_ASSERT(queue.count() == 2000);
GPR_ASSERT(queue.num_node() == 4096);
for (int i = 0; i < 1000; ++i) {
queue.Put(static_cast<void*>(grpc_core::New<WorkItem>(i)));
}
GPR_ASSERT(queue.count() == 3000);
GPR_ASSERT(queue.num_node() == 4096);
for (int i = 0; i < 3000; ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get());
grpc_core::Delete(item);
}
GPR_ASSERT(queue.count() == 0);
GPR_ASSERT(queue.num_node() == 4096);
gpr_log(GPR_DEBUG, "Done.");
}
static void test_many_thread(void) { static void test_many_thread(void) {
gpr_log(GPR_INFO, "test_many_thread"); gpr_log(GPR_INFO, "test_many_thread");
const int num_producer_threads = 10; const int num_producer_threads = 10;
@ -172,6 +216,7 @@ int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv); grpc::testing::TestEnvironment env(argc, argv);
grpc_init(); grpc_init();
test_FIFO(); test_FIFO();
test_space_efficiency();
test_many_thread(); test_many_thread();
grpc_shutdown(); grpc_shutdown();
return 0; return 0;

Loading…
Cancel
Save