|
|
|
@ -42,9 +42,7 @@ class ProducerThread { |
|
|
|
|
public: |
|
|
|
|
ProducerThread(grpc_core::InfLenFIFOQueue* queue, int start_index, |
|
|
|
|
int num_items) |
|
|
|
|
: start_index_(start_index), |
|
|
|
|
num_items_(num_items), |
|
|
|
|
queue_(queue) { |
|
|
|
|
: start_index_(start_index), num_items_(num_items), queue_(queue) { |
|
|
|
|
items_ = nullptr; |
|
|
|
|
thd_ = grpc_core::Thread( |
|
|
|
|
"mpmcq_test_mt_put_thd", |
|
|
|
@ -77,7 +75,6 @@ class ProducerThread { |
|
|
|
|
WorkItem** items_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void ConsumerThread(void* args) { |
|
|
|
|
grpc_core::InfLenFIFOQueue* queue = |
|
|
|
|
static_cast<grpc_core::InfLenFIFOQueue*>(args); |
|
|
|
@ -104,13 +101,14 @@ static void test_get_empty(void) { |
|
|
|
|
|
|
|
|
|
// Fork threads. Threads should block at the beginning since queue is empty.
|
|
|
|
|
for (int i = 0; i < num_threads; ++i) { |
|
|
|
|
thds[i] = |
|
|
|
|
grpc_core::Thread("mpmcq_test_ge_thd", ConsumerThread, &queue); |
|
|
|
|
thds[i] = grpc_core::Thread("mpmcq_test_ge_thd", ConsumerThread, &queue); |
|
|
|
|
thds[i].Start(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
WorkItem** items = new WorkItem*[THREAD_LARGE_ITERATION]; |
|
|
|
|
for (int i = 0; i < THREAD_LARGE_ITERATION; ++i) { |
|
|
|
|
queue.Put(static_cast<void*>(new WorkItem(i))); |
|
|
|
|
items[i] = new WorkItem(i); |
|
|
|
|
queue.Put(static_cast<void*>(items[i])); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "Terminating threads..."); |
|
|
|
@ -120,6 +118,12 @@ static void test_get_empty(void) { |
|
|
|
|
for (int i = 0; i < num_threads; ++i) { |
|
|
|
|
thds[i].Join(); |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_DEBUG, "Checking and Cleaning Up..."); |
|
|
|
|
for (int i = 0; i < THREAD_LARGE_ITERATION; ++i) { |
|
|
|
|
GPR_ASSERT(items[i]->done); |
|
|
|
|
delete items[i]; |
|
|
|
|
} |
|
|
|
|
delete[] items; |
|
|
|
|
gpr_log(GPR_DEBUG, "Done."); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -148,14 +152,14 @@ static void test_many_thread(void) { |
|
|
|
|
gpr_log(GPR_DEBUG, "Fork ProducerThread..."); |
|
|
|
|
for (int i = 0; i < num_work_thd; ++i) { |
|
|
|
|
work_thds[i] = new ProducerThread(&queue, i * THREAD_LARGE_ITERATION, |
|
|
|
|
THREAD_LARGE_ITERATION); |
|
|
|
|
THREAD_LARGE_ITERATION); |
|
|
|
|
work_thds[i]->Start(); |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_DEBUG, "ProducerThread Started."); |
|
|
|
|
gpr_log(GPR_DEBUG, "Fork Getter Thread..."); |
|
|
|
|
for (int i = 0; i < num_get_thd; ++i) { |
|
|
|
|
get_thds[i] = grpc_core::Thread("mpmcq_test_mt_get_thd", ConsumerThread, |
|
|
|
|
&queue); |
|
|
|
|
get_thds[i] = |
|
|
|
|
grpc_core::Thread("mpmcq_test_mt_get_thd", ConsumerThread, &queue); |
|
|
|
|
get_thds[i].Start(); |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_DEBUG, "Getter Thread Started."); |
|
|
|
|