From 7ab95fba08a6b8b8baa7eb0f366b5acb7b4f35da Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 2 Feb 2015 16:50:07 -0800 Subject: [PATCH] Use only some of the thread pool threads for gRPC --- src/node/ext/completion_queue_async_worker.cc | 22 +++++++++++++++++-- src/node/ext/completion_queue_async_worker.h | 5 +++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index 8de7db66d50..8db7229e691 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -43,6 +43,8 @@ namespace grpc { namespace node { +const int max_queue_threads = 2; + using v8::Function; using v8::Handle; using v8::Object; @@ -51,6 +53,9 @@ using v8::Value; grpc_completion_queue *CompletionQueueAsyncWorker::queue; +int CompletionQueueAsyncWorker::current_threads; +int CompletionQueueAsyncWorker::waiting_next_calls; + CompletionQueueAsyncWorker::CompletionQueueAsyncWorker() : NanAsyncWorker(NULL) {} @@ -64,17 +69,30 @@ grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } void CompletionQueueAsyncWorker::Next() { NanScope(); - CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); - NanAsyncQueueWorker(worker); + if (current_threads < max_queue_threads) { + CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); + NanAsyncQueueWorker(worker); + } else { + waiting_next_calls += 1; + } } void CompletionQueueAsyncWorker::Init(Handle exports) { NanScope(); + current_threads = 0; + waiting_next_calls = 0; queue = grpc_completion_queue_create(); } void CompletionQueueAsyncWorker::HandleOKCallback() { NanScope(); + if (waiting_next_calls > 0) { + waiting_next_calls -= 1; + CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); + NanAsyncQueueWorker(worker); + } else { + current_threads -= 1; + } NanCallback event_callback(GetTagHandle(result->tag).As()); Handle argv[] = {CreateEventObject(result)}; diff --git a/src/node/ext/completion_queue_async_worker.h b/src/node/ext/completion_queue_async_worker.h index 2c928b7024f..da586424a0a 100644 --- a/src/node/ext/completion_queue_async_worker.h +++ b/src/node/ext/completion_queue_async_worker.h @@ -71,6 +71,11 @@ class CompletionQueueAsyncWorker : public NanAsyncWorker { grpc_event *result; static grpc_completion_queue *queue; + + // Number of grpc_completion_queue_next calls in the thread pool + static int current_threads; + // Number of grpc_completion_queue_next calls waiting to enter the thread pool + static int waiting_next_calls; }; } // namespace node