diff --git a/binding.gyp b/binding.gyp index 7dd32546d9e..5455becae10 100644 --- a/binding.gyp +++ b/binding.gyp @@ -859,8 +859,8 @@ "src/node/ext/call_credentials.cc", "src/node/ext/channel.cc", "src/node/ext/channel_credentials.cc", - "src/node/ext/completion_queue.cc", - "src/node/ext/completion_queue_async_worker.cc", + "src/node/ext/completion_queue_threadpool.cc", + "src/node/ext/completion_queue_uv.cc", "src/node/ext/node_grpc.cc", "src/node/ext/server.cc", "src/node/ext/server_credentials.cc", diff --git a/build.yaml b/build.yaml index f86d896f6e3..d552a290b29 100644 --- a/build.yaml +++ b/build.yaml @@ -3710,7 +3710,6 @@ node_modules: - src/node/ext/channel.h - src/node/ext/channel_credentials.h - src/node/ext/completion_queue.h - - src/node/ext/completion_queue_async_worker.h - src/node/ext/server.h - src/node/ext/server_credentials.h - src/node/ext/timeval.h @@ -3729,8 +3728,8 @@ node_modules: - src/node/ext/call_credentials.cc - src/node/ext/channel.cc - src/node/ext/channel_credentials.cc - - src/node/ext/completion_queue.cc - - src/node/ext/completion_queue_async_worker.cc + - src/node/ext/completion_queue_threadpool.cc + - src/node/ext/completion_queue_uv.cc - src/node/ext/node_grpc.cc - src/node/ext/server.cc - src/node/ext/server_credentials.cc diff --git a/src/node/ext/completion_queue.h b/src/node/ext/completion_queue.h index bf280f768b0..9b01028ef17 100644 --- a/src/node/ext/completion_queue.h +++ b/src/node/ext/completion_queue.h @@ -32,6 +32,7 @@ */ #include +#include namespace grpc { namespace node { diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_threadpool.cc similarity index 72% rename from src/node/ext/completion_queue_async_worker.cc rename to src/node/ext/completion_queue_threadpool.cc index f5e03b277b3..6302e7a103b 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_threadpool.cc @@ -31,18 +31,63 @@ * */ +/* I don't like using #ifndef, but I don't see a better way to do this */ +#ifndef GRPC_UV + #include #include #include "grpc/grpc.h" #include "grpc/support/log.h" #include "grpc/support/time.h" -#include "completion_queue_async_worker.h" +#include "completion_queue.h" #include "call.h" namespace grpc { namespace node { +namespace { + +/* A worker that asynchronously calls completion_queue_next, and queues onto the + node event loop a call to the function stored in the event's tag. */ +class CompletionQueueAsyncWorker : public Nan::AsyncWorker { + public: + CompletionQueueAsyncWorker(); + + ~CompletionQueueAsyncWorker(); + /* Calls completion_queue_next with the provided deadline, and stores the + event if there was one or sets an error message if there was not */ + void Execute(); + + /* Returns the completion queue attached to this class */ + static grpc_completion_queue *GetQueue(); + + /* Convenience function to create a worker with the given arguments and queue + it to run asynchronously */ + static void Next(); + + /* Initialize the CompletionQueueAsyncWorker class */ + static void Init(v8::Local exports); + + protected: + /* Called when Execute has succeeded (completed without setting an error + message). Calls the saved callback with the event that came from + completion_queue_next */ + void HandleOKCallback(); + + void HandleErrorCallback(); + + private: + grpc_event result; + + static grpc_completion_queue *queue; + + // Number of grpc_completion_queue_next calls in the thread pool + static int current_threads; + // Number of grpc_completion_queue_next calls waiting to enter the thread pool + static int waiting_next_calls; +}; + const int max_queue_threads = 2; using v8::Function; @@ -137,5 +182,21 @@ void CompletionQueueAsyncWorker::HandleErrorCallback() { DestroyTag(result.tag); } +} // namespace + +grpc_completion_queue *GetCompletionQueue() { + return CompletionQueueAsyncWorker::GetQueue(); +} + +void CompletionQueueNext() { + CompletionQueueAsyncWorker::Next(); +} + +void CompletionQueueInit(Local exports) { + CompletionQueueAsyncWorker::Init(exports); +} + } // namespace node } // namespace grpc + +#endif /* GRPC_UV */ diff --git a/src/node/ext/completion_queue.cc b/src/node/ext/completion_queue_uv.cc similarity index 93% rename from src/node/ext/completion_queue.cc rename to src/node/ext/completion_queue_uv.cc index fcfa77b39ce..615973a6c9b 100644 --- a/src/node/ext/completion_queue.cc +++ b/src/node/ext/completion_queue_uv.cc @@ -31,6 +31,8 @@ * */ +#ifdef GRPC_UV + #include #include #include @@ -38,7 +40,6 @@ #include "call.h" #include "completion_queue.h" -#include "completion_queue_async_worker.h" namespace grpc { namespace node { @@ -81,34 +82,24 @@ void drain_completion_queue(uv_prepare_t *handle) { } grpc_completion_queue *GetCompletionQueue() { -#ifdef GRPC_UV return queue; -#else - return CompletionQueueAsyncWorker::GetQueue(); -#endif } void CompletionQueueNext() { -#ifdef GRPC_UV if (pending_batches == 0) { GPR_ASSERT(!uv_is_active((uv_handle_t *)&prepare)); uv_prepare_start(&prepare, drain_completion_queue); } pending_batches++; -#else - CompletionQueueAsyncWorker::Next(); -#endif } void CompletionQueueInit(Local exports) { -#ifdef GRPC_UV queue = grpc_completion_queue_create(NULL); uv_prepare_init(uv_default_loop(), &prepare); pending_batches = 0; -#else - CompletionQueueAsyncWorker::Init(exports); -#endif } } // namespace node } // namespace grpc + +#endif /* GRPC_UV */