fix undefined reference to operator delete for MPMCQueue interface class

pull/19358/head
Yunjia Wang 6 years ago
parent a633ad3814
commit 6fc7d2b18f
  1. 20
      build.yaml
  2. 44
      src/core/lib/iomgr/threadpool/mpmcqueue.cc
  3. 78
      src/core/lib/iomgr/threadpool/mpmcqueue.h
  4. 43
      test/core/iomgr/mpmcqueue_test.cc

@ -3268,15 +3268,6 @@ targets:
- grpc - grpc
- gpr - gpr
uses_polling: false uses_polling: false
- name: mpmcqueue_test
build: test
language: c
src:
- test/core/iomgr/mpmcqueue_test.cc
deps:
- grpc_test_util
- grpc
- gpr
- name: multiple_server_queues_test - name: multiple_server_queues_test
build: test build: test
language: c language: c
@ -5239,6 +5230,17 @@ targets:
- grpc++ - grpc++
- grpc - grpc
- gpr - gpr
- name: mpmcqueue_test
build: test
language: c++
src:
- test/core/iomgr/mpmcqueue_test.cc
deps:
- grpc++_test_util
- grpc_test_util
- grpc++
- grpc
- gpr
- name: nonblocking_test - name: nonblocking_test
gtest: true gtest: true
build: test build: test

@ -16,46 +16,42 @@
* *
*/ */
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/threadpool/mpmcqueue.h" #include "src/core/lib/iomgr/threadpool/mpmcqueue.h"
#include <string.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/cpu.h> #include <grpc/support/cpu.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include <string.h>
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
namespace grpc_core { namespace grpc_core {
inline void* MPMCQueue::PopFront() { inline void* MPMCQueue::PopFront() {
void* result = queue_head_->content; void* result = queue_head_->content;
Node* head_to_remove = queue_head_; Node* head_to_remove = queue_head_;
queue_head_ = queue_head_->next; queue_head_ = queue_head_->next;
count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED); count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED);
gpr_timespec wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), gpr_timespec wait_time =
head_to_remove->insert_time); gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), head_to_remove->insert_time);
// gpr_free(head_to_remove);
delete head_to_remove; delete head_to_remove;
// Update Stats info // Update Stats info
stats_.num_completed++; stats_.num_completed++;
stats_.total_queue_cycles = gpr_time_add(stats_.total_queue_cycles, stats_.total_queue_cycles =
wait_time); gpr_time_add(stats_.total_queue_cycles, wait_time);
stats_.max_queue_cycles = gpr_time_max( stats_.max_queue_cycles = gpr_time_max(
gpr_convert_clock_type(stats_.max_queue_cycles, GPR_TIMESPAN), wait_time); gpr_convert_clock_type(stats_.max_queue_cycles, GPR_TIMESPAN), wait_time);
if (count_.Load(MemoryOrder::RELAXED) == 0) { if (count_.Load(MemoryOrder::RELAXED) == 0) {
stats_.busy_time_cycles = gpr_time_add( stats_.busy_time_cycles =
stats_.busy_time_cycles, gpr_time_add(stats_.busy_time_cycles,
gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), busy_time)); gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), busy_time));
} }
// Singal waiting thread // Singal waiting thread
@ -66,7 +62,8 @@ inline void* MPMCQueue::PopFront() {
return result; return result;
} }
MPMCQueue::MPMCQueue() : num_waiters_(0), queue_head_(0), queue_tail_(0) { MPMCQueue::MPMCQueue() : num_waiters_(0), queue_head_(nullptr),
queue_tail_(nullptr) {
count_.Store(0, MemoryOrder::RELAXED); count_.Store(0, MemoryOrder::RELAXED);
} }
@ -80,10 +77,7 @@ MPMCQueue::~MPMCQueue() {
void MPMCQueue::Put(void* elem) { void MPMCQueue::Put(void* elem) {
MutexLock l(&mu_); MutexLock l(&mu_);
// Node* new_node = static_cast<Node*>(gpr_malloc(sizeof(Node)));
// new_node->next = nullptr;
// new_node->content = elem;
// new_node->insert_time = gpr_now(GPR_CLOCK_PRECISE);
Node* new_node = static_cast<Node*>(new Node(elem)); Node* new_node = static_cast<Node*>(new Node(elem));
if (count_.Load(MemoryOrder::RELAXED) == 0) { if (count_.Load(MemoryOrder::RELAXED) == 0) {
busy_time = gpr_now(GPR_CLOCK_PRECISE); busy_time = gpr_now(GPR_CLOCK_PRECISE);
@ -131,12 +125,12 @@ void MPMCQueue::PrintStats() {
MPMCQueue::Stats* MPMCQueue::queue_stats() { MPMCQueue::Stats* MPMCQueue::queue_stats() {
MPMCQueue::Stats* result = new Stats(); MPMCQueue::Stats* result = new Stats();
MutexLock l(&mu_); MutexLock l(&mu_);
result->total_queue_cycles = gpr_time_add(result->total_queue_cycles, result->total_queue_cycles =
stats_.total_queue_cycles); gpr_time_add(result->total_queue_cycles, stats_.total_queue_cycles);
result->max_queue_cycles = gpr_time_add(result->max_queue_cycles, result->max_queue_cycles =
stats_.max_queue_cycles); gpr_time_add(result->max_queue_cycles, stats_.max_queue_cycles);
result->busy_time_cycles = gpr_time_add(result->busy_time_cycles, result->busy_time_cycles =
stats_.busy_time_cycles); gpr_time_add(result->busy_time_cycles, stats_.busy_time_cycles);
return result; return result;
} }

@ -16,15 +16,15 @@
* *
*/ */
#ifndef GRPC_CORE_LIB_IOMGR_MPMCQUEUE_H #ifndef GRPC_CORE_LIB_IOMGR_THREADPOOL_MPMCQUEUE_H
#define GRPC_CORE_LIB_IOMGR_MPMCQUEUE_H #define GRPC_CORE_LIB_IOMGR_THREADPOOL_MPMCQUEUE_H
#include <grpc/support/alloc.h>
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
#include <grpc/support/alloc.h>
#include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include <grpc/support/time.h>
namespace grpc_core { namespace grpc_core {
@ -36,7 +36,7 @@ class MPMCQueueInterface {
// Put elem into queue immediately at the end of queue. // Put elem into queue immediately at the end of queue.
// This might cause to block on full queue depending on implementation. // This might cause to block on full queue depending on implementation.
virtual void Put(void *elem) = 0; virtual void Put(void* elem) = 0;
// Remove the oldest element from the queue and return it. // Remove the oldest element from the queue and return it.
// This might cause to block on empty queue depending on implementation. // This might cause to block on empty queue depending on implementation.
@ -48,16 +48,16 @@ class MPMCQueueInterface {
class MPMCQueue : public MPMCQueueInterface { class MPMCQueue : public MPMCQueueInterface {
public: public:
struct Stats { // Stats of queue struct Stats { // Stats of queue
uint64_t num_started; // Number of elements have been added to queue uint64_t num_started; // Number of elements have been added to queue
uint64_t num_completed; // Number of elements have been removed from uint64_t num_completed; // Number of elements have been removed from
// the queue // the queue
gpr_timespec total_queue_cycles; // Total waiting time that all the gpr_timespec total_queue_cycles; // Total waiting time that all the
// removed elements have spent in queue // removed elements have spent in queue
gpr_timespec max_queue_cycles; // Max waiting time among all removed gpr_timespec max_queue_cycles; // Max waiting time among all removed
// elements // elements
gpr_timespec busy_time_cycles; // Accumulated amount of time that queue gpr_timespec busy_time_cycles; // Accumulated amount of time that queue
// was not empty // was not empty
Stats() { Stats() {
num_started = 0; num_started = 0;
@ -66,23 +66,7 @@ class MPMCQueue : public MPMCQueueInterface {
max_queue_cycles = gpr_time_0(GPR_TIMESPAN); max_queue_cycles = gpr_time_0(GPR_TIMESPAN);
busy_time_cycles = gpr_time_0(GPR_TIMESPAN); busy_time_cycles = gpr_time_0(GPR_TIMESPAN);
} }
void* operator new(size_t n) {
void* p = gpr_malloc(n);
return p;
}
void operator delete(void* p) {
gpr_free(p);
}
}; };
void* operator new(size_t n) {
void* p = gpr_malloc(n);
return p;
}
void operator delete(void* p) {
gpr_free(p);
}
// Create a new Multiple-Producer-Multiple-Consumer Queue. The queue created // Create a new Multiple-Producer-Multiple-Consumer Queue. The queue created
// will have infinite length. // will have infinite length.
explicit MPMCQueue(); explicit MPMCQueue();
@ -115,34 +99,26 @@ class MPMCQueue : public MPMCQueueInterface {
void* PopFront(); void* PopFront();
struct Node { struct Node {
Node *next; // Linking Node* next; // Linking
void *content; // Points to actual element void* content; // Points to actual element
gpr_timespec insert_time; // Time for stats gpr_timespec insert_time; // Time for stats
Node(void* c) : content(c) { Node(void* c) : content(c) {
next = nullptr; next = nullptr;
insert_time = gpr_now(GPR_CLOCK_PRECISE); insert_time = gpr_now(GPR_CLOCK_PRECISE);
} }
void* operator new(size_t n) {
void* p = gpr_malloc(n);
return p;
}
void operator delete(void* p) {
gpr_free(p);
}
}; };
Mutex mu_; // Protecting lock Mutex mu_; // Protecting lock
CondVar wait_nonempty_; // Wait on empty queue on get CondVar wait_nonempty_; // Wait on empty queue on get
int num_waiters_; // Number of waiters int num_waiters_; // Number of waiters
Node *queue_head_; // Head of the queue, remove position Node* queue_head_; // Head of the queue, remove position
Node *queue_tail_; // End of queue, insert position Node* queue_tail_; // End of queue, insert position
Atomic<uint64_t> count_; // Number of elements in queue Atomic<uint64_t> count_; // Number of elements in queue
Stats stats_; // Stats info Stats stats_; // Stats info
gpr_timespec busy_time; // Start time of busy queue gpr_timespec busy_time; // Start time of busy queue
}; };
} // namespace grpc_core } // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_MPMCQUEUE_H */ #endif /* GRPC_CORE_LIB_IOMGR_THREADPOOL_MPMCQUEUE_H */

@ -42,17 +42,7 @@ struct WorkItem {
int index; int index;
bool done; bool done;
WorkItem(int i) : index(i) { WorkItem(int i) : index(i) { done = false; }
done = false;
}
void* operator new(size_t n) {
void* p = gpr_malloc(n);
return p;
}
void operator delete(void* p) {
gpr_free(p);
}
}; };
static void test_small_queue(void) { static void test_small_queue(void) {
@ -132,38 +122,28 @@ static void test_large_queue(void) {
class WorkThread { class WorkThread {
public: public:
WorkThread(grpc_core::MPMCQueue* mpmcqueue, int start_index, int num_items) WorkThread(grpc_core::MPMCQueue* mpmcqueue, int start_index, int num_items)
: start_index_(start_index), num_items_(num_items), : start_index_(start_index),
num_items_(num_items),
mpmcqueue_(mpmcqueue) { mpmcqueue_(mpmcqueue) {
items_ = NULL; items_ = NULL;
thd_ = grpc_core::Thread( thd_ = grpc_core::Thread(
"mpmcq_test_mt_put_thd", "mpmcq_test_mt_put_thd",
[](void* th) { static_cast<WorkThread*>(th)->Run(); }, [](void* th) { static_cast<WorkThread*>(th)->Run(); }, this);
this);
} }
~WorkThread() { ~WorkThread() {
for (int i = 0; i < num_items_; ++i) { for (int i = 0; i < num_items_; ++i) {
GPR_ASSERT(items_[i]->done); GPR_ASSERT(items_[i]->done);
delete items_[i]; delete items_[i];
} }
gpr_free(items_); delete[] items_;
} }
void Start() { thd_.Start(); } void Start() { thd_.Start(); }
void Join() { thd_.Join(); } void Join() { thd_.Join(); }
void* operator new(size_t n) {
void* p = gpr_malloc(n);
return p;
}
void operator delete(void* p) {
gpr_free(p);
}
private: private:
void Run() { void Run() {
items_ = static_cast<WorkItem**>( items_ = new WorkItem*[num_items_];
gpr_malloc(sizeof(WorkItem*) * num_items_));
for (int i = 0; i < num_items_; ++i) { for (int i = 0; i < num_items_; ++i) {
items_[i] = new WorkItem(start_index_ + i); items_[i] = new WorkItem(start_index_ + i);
mpmcqueue_->Put(items_[i]); mpmcqueue_->Put(items_[i]);
@ -177,7 +157,6 @@ class WorkThread {
WorkItem** items_; WorkItem** items_;
}; };
static void test_many_get_thd(void* args) { static void test_many_get_thd(void* args) {
grpc_core::MPMCQueue* mpmcqueue = static_cast<grpc_core::MPMCQueue*>(args); grpc_core::MPMCQueue* mpmcqueue = static_cast<grpc_core::MPMCQueue*>(args);
@ -199,8 +178,7 @@ static void test_many_thread(void) {
const int num_work_thd = 10; const int num_work_thd = 10;
const int num_get_thd = 20; const int num_get_thd = 20;
grpc_core::MPMCQueue mpmcqueue; grpc_core::MPMCQueue mpmcqueue;
WorkThread** work_thds = WorkThread** work_thds = new WorkThread*[num_work_thd];
static_cast<WorkThread**>(gpr_malloc(sizeof(WorkThread*) * num_work_thd));
grpc_core::Thread get_thds[num_get_thd]; grpc_core::Thread get_thds[num_get_thd];
gpr_log(GPR_DEBUG, "Fork WorkThread..."); gpr_log(GPR_DEBUG, "Fork WorkThread...");
@ -212,8 +190,8 @@ static void test_many_thread(void) {
gpr_log(GPR_DEBUG, "WorkThread Started."); gpr_log(GPR_DEBUG, "WorkThread Started.");
gpr_log(GPR_DEBUG, "For Getter Thread..."); gpr_log(GPR_DEBUG, "For Getter Thread...");
for (int i = 0; i < num_get_thd; ++i) { for (int i = 0; i < num_get_thd; ++i) {
get_thds[i] = grpc_core::Thread("mpmcq_test_mt_get_thd", get_thds[i] = grpc_core::Thread("mpmcq_test_mt_get_thd", test_many_get_thd,
test_many_get_thd, &mpmcqueue); &mpmcqueue);
get_thds[i].Start(); get_thds[i].Start();
} }
gpr_log(GPR_DEBUG, "Getter Thread Started."); gpr_log(GPR_DEBUG, "Getter Thread Started.");
@ -234,11 +212,10 @@ static void test_many_thread(void) {
for (int i = 0; i < num_work_thd; ++i) { for (int i = 0; i < num_work_thd; ++i) {
delete work_thds[i]; delete work_thds[i];
} }
gpr_free(work_thds); delete[] work_thds;
gpr_log(GPR_DEBUG, "Done."); gpr_log(GPR_DEBUG, "Done.");
} }
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv); grpc::testing::TestEnvironment env(argc, argv);
grpc_init(); grpc_init();

Loading…
Cancel
Save