mirror of https://github.com/grpc/grpc.git
parent
e4adb6ac56
commit
a633ad3814
6 changed files with 568 additions and 0 deletions
@ -0,0 +1,143 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2019 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/threadpool/mpmcqueue.h" |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/cpu.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/sync.h> |
||||
#include <grpc/support/time.h> |
||||
|
||||
#include "src/core/lib/gprpp/sync.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
|
||||
|
||||
inline void* MPMCQueue::PopFront() { |
||||
void* result = queue_head_->content; |
||||
Node* head_to_remove = queue_head_; |
||||
queue_head_ = queue_head_->next; |
||||
|
||||
count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED); |
||||
gpr_timespec wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), |
||||
head_to_remove->insert_time); |
||||
// gpr_free(head_to_remove);
|
||||
delete head_to_remove; |
||||
|
||||
// Update Stats info
|
||||
stats_.num_completed++; |
||||
stats_.total_queue_cycles = gpr_time_add(stats_.total_queue_cycles, |
||||
wait_time); |
||||
stats_.max_queue_cycles = gpr_time_max( |
||||
gpr_convert_clock_type(stats_.max_queue_cycles, GPR_TIMESPAN), wait_time); |
||||
|
||||
if (count_.Load(MemoryOrder::RELAXED) == 0) { |
||||
stats_.busy_time_cycles = gpr_time_add( |
||||
stats_.busy_time_cycles, |
||||
gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), busy_time)); |
||||
} |
||||
|
||||
// Singal waiting thread
|
||||
if (count_.Load(MemoryOrder::RELAXED) > 0 && num_waiters_ > 0) { |
||||
wait_nonempty_.Signal(); |
||||
} |
||||
|
||||
return result; |
||||
} |
||||
|
||||
MPMCQueue::MPMCQueue() : num_waiters_(0), queue_head_(0), queue_tail_(0) { |
||||
count_.Store(0, MemoryOrder::RELAXED); |
||||
} |
||||
|
||||
MPMCQueue::~MPMCQueue() { |
||||
GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) == 0); |
||||
ReleasableMutexLock l(&mu_); |
||||
GPR_ASSERT(num_waiters_ == 0); |
||||
l.Unlock(); |
||||
PrintStats(); |
||||
} |
||||
|
||||
void MPMCQueue::Put(void* elem) { |
||||
MutexLock l(&mu_); |
||||
// Node* new_node = static_cast<Node*>(gpr_malloc(sizeof(Node)));
|
||||
// new_node->next = nullptr;
|
||||
// new_node->content = elem;
|
||||
// new_node->insert_time = gpr_now(GPR_CLOCK_PRECISE);
|
||||
Node* new_node = static_cast<Node*>(new Node(elem)); |
||||
if (count_.Load(MemoryOrder::RELAXED) == 0) { |
||||
busy_time = gpr_now(GPR_CLOCK_PRECISE); |
||||
queue_head_ = queue_tail_ = new_node; |
||||
} else { |
||||
queue_tail_->next = new_node; |
||||
queue_tail_ = queue_tail_->next; |
||||
} |
||||
count_.Store(count_.Load(MemoryOrder::RELAXED) + 1, MemoryOrder::RELAXED); |
||||
|
||||
// Update Stats info
|
||||
stats_.num_started++; |
||||
|
||||
if (num_waiters_ > 0) { |
||||
wait_nonempty_.Signal(); |
||||
} |
||||
} |
||||
|
||||
void* MPMCQueue::Get() { |
||||
MutexLock l(&mu_); |
||||
if (count_.Load(MemoryOrder::RELAXED) == 0) { |
||||
num_waiters_++; |
||||
do { |
||||
wait_nonempty_.Wait(&mu_); |
||||
} while (count_.Load(MemoryOrder::RELAXED) == 0); |
||||
num_waiters_--; |
||||
} |
||||
GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) > 0); |
||||
return PopFront(); |
||||
} |
||||
|
||||
void MPMCQueue::PrintStats() { |
||||
MutexLock l(&mu_); |
||||
gpr_log(GPR_INFO, "STATS INFO:"); |
||||
gpr_log(GPR_INFO, "num_started: %lu", stats_.num_started); |
||||
gpr_log(GPR_INFO, "num_completed: %lu", stats_.num_completed); |
||||
gpr_log(GPR_INFO, "total_queue_cycles: %d", |
||||
gpr_time_to_millis(stats_.total_queue_cycles)); |
||||
gpr_log(GPR_INFO, "max_queue_cycles: %d", |
||||
gpr_time_to_millis(stats_.max_queue_cycles)); |
||||
gpr_log(GPR_INFO, "busy_time_cycles: %d", |
||||
gpr_time_to_millis(stats_.busy_time_cycles)); |
||||
} |
||||
|
||||
MPMCQueue::Stats* MPMCQueue::queue_stats() { |
||||
MPMCQueue::Stats* result = new Stats(); |
||||
MutexLock l(&mu_); |
||||
result->total_queue_cycles = gpr_time_add(result->total_queue_cycles, |
||||
stats_.total_queue_cycles); |
||||
result->max_queue_cycles = gpr_time_add(result->max_queue_cycles, |
||||
stats_.max_queue_cycles); |
||||
result->busy_time_cycles = gpr_time_add(result->busy_time_cycles, |
||||
stats_.busy_time_cycles); |
||||
return result; |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,148 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2019 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_MPMCQUEUE_H |
||||
#define GRPC_CORE_LIB_IOMGR_MPMCQUEUE_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include "src/core/lib/gprpp/atomic.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include <grpc/support/time.h> |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Abstract base class of a MPMC queue interface
|
||||
class MPMCQueueInterface { |
||||
public: |
||||
MPMCQueueInterface() {} |
||||
virtual ~MPMCQueueInterface() {} |
||||
|
||||
// Put elem into queue immediately at the end of queue.
|
||||
// This might cause to block on full queue depending on implementation.
|
||||
virtual void Put(void *elem) = 0; |
||||
|
||||
// Remove the oldest element from the queue and return it.
|
||||
// This might cause to block on empty queue depending on implementation.
|
||||
virtual void* Get() = 0; |
||||
|
||||
// Return number of elements in the queue currently
|
||||
virtual int count() const = 0; |
||||
}; |
||||
|
||||
class MPMCQueue : public MPMCQueueInterface { |
||||
public: |
||||
struct Stats { // Stats of queue
|
||||
uint64_t num_started; // Number of elements have been added to queue
|
||||
uint64_t num_completed; // Number of elements have been removed from
|
||||
// the queue
|
||||
gpr_timespec total_queue_cycles; // Total waiting time that all the
|
||||
// removed elements have spent in queue
|
||||
gpr_timespec max_queue_cycles; // Max waiting time among all removed
|
||||
// elements
|
||||
gpr_timespec busy_time_cycles; // Accumulated amount of time that queue
|
||||
// was not empty
|
||||
|
||||
Stats() { |
||||
num_started = 0; |
||||
num_completed = 0; |
||||
total_queue_cycles = gpr_time_0(GPR_TIMESPAN); |
||||
max_queue_cycles = gpr_time_0(GPR_TIMESPAN); |
||||
busy_time_cycles = gpr_time_0(GPR_TIMESPAN); |
||||
} |
||||
void* operator new(size_t n) { |
||||
void* p = gpr_malloc(n); |
||||
return p; |
||||
} |
||||
|
||||
void operator delete(void* p) { |
||||
gpr_free(p); |
||||
} |
||||
}; |
||||
void* operator new(size_t n) { |
||||
void* p = gpr_malloc(n); |
||||
return p; |
||||
} |
||||
|
||||
void operator delete(void* p) { |
||||
gpr_free(p); |
||||
} |
||||
// Create a new Multiple-Producer-Multiple-Consumer Queue. The queue created
|
||||
// will have infinite length.
|
||||
explicit MPMCQueue(); |
||||
|
||||
// Release all resources hold by the queue. The queue must be empty, and no
|
||||
// one waiting on conditional variables.
|
||||
~MPMCQueue(); |
||||
|
||||
// Put elem into queue immediately at the end of queue. Since the queue has
|
||||
// infinite length, this routine will never block and should never fail.
|
||||
void Put(void* elem); |
||||
|
||||
// Remove the oldest element from the queue and return it.
|
||||
// This routine will cause the thread to block if queue is currently empty.
|
||||
void* Get(); |
||||
|
||||
// Return number of elements in queue currently.
|
||||
// There might be concurrently add/remove on queue, so count might change
|
||||
// quickly.
|
||||
int count() const { return count_.Load(MemoryOrder::RELAXED); } |
||||
|
||||
// Print out Stats. Time measurement are printed in millisecond.
|
||||
void PrintStats(); |
||||
|
||||
// Return a copy of current stats info. This info will be changed quickly
|
||||
// when queue is still running. This copy will not deleted by queue.
|
||||
Stats* queue_stats(); |
||||
|
||||
private: |
||||
void* PopFront(); |
||||
|
||||
struct Node { |
||||
Node *next; // Linking
|
||||
void *content; // Points to actual element
|
||||
gpr_timespec insert_time; // Time for stats
|
||||
Node(void* c) : content(c) { |
||||
next = nullptr; |
||||
insert_time = gpr_now(GPR_CLOCK_PRECISE); |
||||
} |
||||
void* operator new(size_t n) { |
||||
void* p = gpr_malloc(n); |
||||
return p; |
||||
} |
||||
|
||||
void operator delete(void* p) { |
||||
gpr_free(p); |
||||
} |
||||
}; |
||||
|
||||
Mutex mu_; // Protecting lock
|
||||
CondVar wait_nonempty_; // Wait on empty queue on get
|
||||
int num_waiters_; // Number of waiters
|
||||
|
||||
Node *queue_head_; // Head of the queue, remove position
|
||||
Node *queue_tail_; // End of queue, insert position
|
||||
Atomic<uint64_t> count_; // Number of elements in queue
|
||||
Stats stats_; // Stats info
|
||||
gpr_timespec busy_time; // Start time of busy queue
|
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_MPMCQUEUE_H */ |
@ -0,0 +1,253 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2019 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/lib/iomgr/threadpool/mpmcqueue.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/gpr/useful.h" |
||||
#include "src/core/lib/gprpp/thd.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
#define THREAD_SMALL_ITERATION 100 |
||||
#define THREAD_LARGE_ITERATION 10000 |
||||
|
||||
static void test_no_op(void) { |
||||
gpr_log(GPR_DEBUG, "test_no_op"); |
||||
grpc_core::MPMCQueue mpmcqueue; |
||||
gpr_log(GPR_DEBUG, "Checking count..."); |
||||
GPR_ASSERT(mpmcqueue.count() == 0); |
||||
gpr_log(GPR_DEBUG, "Done."); |
||||
} |
||||
|
||||
// Testing items for queue
|
||||
struct WorkItem { |
||||
int index; |
||||
bool done; |
||||
|
||||
WorkItem(int i) : index(i) { |
||||
done = false; |
||||
} |
||||
void* operator new(size_t n) { |
||||
void* p = gpr_malloc(n); |
||||
return p; |
||||
} |
||||
|
||||
void operator delete(void* p) { |
||||
gpr_free(p); |
||||
} |
||||
}; |
||||
|
||||
static void test_small_queue(void) { |
||||
gpr_log(GPR_DEBUG, "test_small_queue"); |
||||
grpc_core::MPMCQueue small_queue; |
||||
for (int i = 0; i < THREAD_SMALL_ITERATION; ++i) { |
||||
small_queue.Put(static_cast<void*>(new WorkItem(i))); |
||||
} |
||||
GPR_ASSERT(small_queue.count() == THREAD_SMALL_ITERATION); |
||||
// Get items out in FIFO order
|
||||
for (int i = 0; i < THREAD_SMALL_ITERATION; ++i) { |
||||
WorkItem* item = static_cast<WorkItem*>(small_queue.Get()); |
||||
GPR_ASSERT(i == item->index); |
||||
delete item; |
||||
} |
||||
} |
||||
|
||||
static void test_get_thd(void* args) { |
||||
grpc_core::MPMCQueue* mpmcqueue = static_cast<grpc_core::MPMCQueue*>(args); |
||||
|
||||
// count number of Get() called in this thread
|
||||
int count = 0; |
||||
int last_index = -1; |
||||
WorkItem* item; |
||||
while ((item = static_cast<WorkItem*>(mpmcqueue->Get())) != NULL) { |
||||
count++; |
||||
GPR_ASSERT(item->index > last_index); |
||||
last_index = item->index; |
||||
GPR_ASSERT(!item->done); |
||||
delete item; |
||||
} |
||||
|
||||
gpr_log(GPR_DEBUG, "test_get_thd: %d times of Get() called.", count); |
||||
} |
||||
|
||||
static void test_get_empty(void) { |
||||
gpr_log(GPR_DEBUG, "test_get_empty"); |
||||
grpc_core::MPMCQueue mpmcqueue; |
||||
const int num_threads = 10; |
||||
grpc_core::Thread thds[num_threads]; |
||||
|
||||
// Fork threads. Threads should block at the beginning since queue is empty.
|
||||
for (int i = 0; i < num_threads; ++i) { |
||||
thds[i] = grpc_core::Thread("mpmcq_test_ge_thd", test_get_thd, &mpmcqueue); |
||||
thds[i].Start(); |
||||
} |
||||
|
||||
for (int i = 0; i < THREAD_LARGE_ITERATION; ++i) { |
||||
mpmcqueue.Put(static_cast<void*>(new WorkItem(i))); |
||||
} |
||||
|
||||
gpr_log(GPR_DEBUG, "Terminating threads..."); |
||||
for (int i = 0; i < num_threads; ++i) { |
||||
mpmcqueue.Put(NULL); |
||||
} |
||||
for (int i = 0; i < num_threads; ++i) { |
||||
thds[i].Join(); |
||||
} |
||||
gpr_log(GPR_DEBUG, "Done."); |
||||
} |
||||
|
||||
static void test_large_queue(void) { |
||||
gpr_log(GPR_DEBUG, "test_large_queue"); |
||||
grpc_core::MPMCQueue large_queue; |
||||
for (int i = 0; i < THREAD_LARGE_ITERATION; ++i) { |
||||
large_queue.Put(static_cast<void*>(new WorkItem(i))); |
||||
} |
||||
GPR_ASSERT(large_queue.count() == THREAD_LARGE_ITERATION); |
||||
for (int i = 0; i < THREAD_LARGE_ITERATION; ++i) { |
||||
WorkItem* item = static_cast<WorkItem*>(large_queue.Get()); |
||||
GPR_ASSERT(i == item->index); |
||||
delete item; |
||||
} |
||||
} |
||||
|
||||
// Thread for put items into queue
|
||||
class WorkThread { |
||||
public: |
||||
WorkThread(grpc_core::MPMCQueue* mpmcqueue, int start_index, int num_items) |
||||
: start_index_(start_index), num_items_(num_items), |
||||
mpmcqueue_(mpmcqueue) { |
||||
items_ = NULL; |
||||
thd_ = grpc_core::Thread( |
||||
"mpmcq_test_mt_put_thd", |
||||
[](void* th) { static_cast<WorkThread*>(th)->Run(); }, |
||||
this); |
||||
} |
||||
~WorkThread() { |
||||
for (int i = 0; i < num_items_; ++i) { |
||||
GPR_ASSERT(items_[i]->done); |
||||
delete items_[i]; |
||||
} |
||||
gpr_free(items_); |
||||
} |
||||
|
||||
void Start() { thd_.Start(); } |
||||
void Join() { thd_.Join(); } |
||||
|
||||
void* operator new(size_t n) { |
||||
void* p = gpr_malloc(n); |
||||
return p; |
||||
} |
||||
|
||||
void operator delete(void* p) { |
||||
gpr_free(p); |
||||
} |
||||
|
||||
private: |
||||
void Run() { |
||||
items_ = static_cast<WorkItem**>( |
||||
gpr_malloc(sizeof(WorkItem*) * num_items_)); |
||||
for (int i = 0; i < num_items_; ++i) { |
||||
items_[i] = new WorkItem(start_index_ + i); |
||||
mpmcqueue_->Put(items_[i]); |
||||
} |
||||
} |
||||
|
||||
int start_index_; |
||||
int num_items_; |
||||
grpc_core::MPMCQueue* mpmcqueue_; |
||||
grpc_core::Thread thd_; |
||||
WorkItem** items_; |
||||
}; |
||||
|
||||
|
||||
static void test_many_get_thd(void* args) { |
||||
grpc_core::MPMCQueue* mpmcqueue = static_cast<grpc_core::MPMCQueue*>(args); |
||||
|
||||
// count number of Get() called in this thread
|
||||
int count = 0; |
||||
|
||||
WorkItem* item; |
||||
while ((item = static_cast<WorkItem*>(mpmcqueue->Get())) != NULL) { |
||||
count++; |
||||
GPR_ASSERT(!item->done); |
||||
item->done = true; |
||||
} |
||||
|
||||
gpr_log(GPR_DEBUG, "test_many_get_thd: %d times of Get() called.", count); |
||||
} |
||||
|
||||
static void test_many_thread(void) { |
||||
gpr_log(GPR_DEBUG, "test_many_thread"); |
||||
const int num_work_thd = 10; |
||||
const int num_get_thd = 20; |
||||
grpc_core::MPMCQueue mpmcqueue; |
||||
WorkThread** work_thds = |
||||
static_cast<WorkThread**>(gpr_malloc(sizeof(WorkThread*) * num_work_thd)); |
||||
grpc_core::Thread get_thds[num_get_thd]; |
||||
|
||||
gpr_log(GPR_DEBUG, "Fork WorkThread..."); |
||||
for (int i = 0; i < num_work_thd; ++i) { |
||||
work_thds[i] = new WorkThread(&mpmcqueue, i * THREAD_LARGE_ITERATION, |
||||
THREAD_LARGE_ITERATION); |
||||
work_thds[i]->Start(); |
||||
} |
||||
gpr_log(GPR_DEBUG, "WorkThread Started."); |
||||
gpr_log(GPR_DEBUG, "For Getter Thread..."); |
||||
for (int i = 0; i < num_get_thd; ++i) { |
||||
get_thds[i] = grpc_core::Thread("mpmcq_test_mt_get_thd", |
||||
test_many_get_thd, &mpmcqueue); |
||||
get_thds[i].Start(); |
||||
} |
||||
gpr_log(GPR_DEBUG, "Getter Thread Started."); |
||||
gpr_log(GPR_DEBUG, "Waiting WorkThread to finish..."); |
||||
for (int i = 0; i < num_work_thd; ++i) { |
||||
work_thds[i]->Join(); |
||||
} |
||||
gpr_log(GPR_DEBUG, "All WorkThread Terminated."); |
||||
gpr_log(GPR_DEBUG, "Terminating Getter Thread..."); |
||||
for (int i = 0; i < num_get_thd; ++i) { |
||||
mpmcqueue.Put(NULL); |
||||
} |
||||
for (int i = 0; i < num_get_thd; ++i) { |
||||
get_thds[i].Join(); |
||||
} |
||||
gpr_log(GPR_DEBUG, "All Getter Thread Terminated."); |
||||
gpr_log(GPR_DEBUG, "Checking WorkItems and Cleaning Up..."); |
||||
for (int i = 0; i < num_work_thd; ++i) { |
||||
delete work_thds[i]; |
||||
} |
||||
gpr_free(work_thds); |
||||
gpr_log(GPR_DEBUG, "Done."); |
||||
} |
||||
|
||||
|
||||
int main(int argc, char** argv) { |
||||
grpc::testing::TestEnvironment env(argc, argv); |
||||
grpc_init(); |
||||
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); |
||||
test_no_op(); |
||||
test_small_queue(); |
||||
test_get_empty(); |
||||
test_large_queue(); |
||||
test_many_thread(); |
||||
grpc_shutdown(); |
||||
return 0; |
||||
} |
Loading…
Reference in new issue