Refactor uv/non-uv code in Node extension

pull/8601/head
murgatroid99 8 years ago
parent fdbf733053
commit 013d203d5d
  1. 4
      binding.gyp
  2. 5
      build.yaml
  3. 1
      src/node/ext/completion_queue.h
  4. 63
      src/node/ext/completion_queue_threadpool.cc
  5. 17
      src/node/ext/completion_queue_uv.cc

@ -859,8 +859,8 @@
"src/node/ext/call_credentials.cc",
"src/node/ext/channel.cc",
"src/node/ext/channel_credentials.cc",
"src/node/ext/completion_queue.cc",
"src/node/ext/completion_queue_async_worker.cc",
"src/node/ext/completion_queue_threadpool.cc",
"src/node/ext/completion_queue_uv.cc",
"src/node/ext/node_grpc.cc",
"src/node/ext/server.cc",
"src/node/ext/server_credentials.cc",

@ -3710,7 +3710,6 @@ node_modules:
- src/node/ext/channel.h
- src/node/ext/channel_credentials.h
- src/node/ext/completion_queue.h
- src/node/ext/completion_queue_async_worker.h
- src/node/ext/server.h
- src/node/ext/server_credentials.h
- src/node/ext/timeval.h
@ -3729,8 +3728,8 @@ node_modules:
- src/node/ext/call_credentials.cc
- src/node/ext/channel.cc
- src/node/ext/channel_credentials.cc
- src/node/ext/completion_queue.cc
- src/node/ext/completion_queue_async_worker.cc
- src/node/ext/completion_queue_threadpool.cc
- src/node/ext/completion_queue_uv.cc
- src/node/ext/node_grpc.cc
- src/node/ext/server.cc
- src/node/ext/server_credentials.cc

@ -32,6 +32,7 @@
*/
#include <v8.h>
#include <grpc/grpc.h>
namespace grpc {
namespace node {

@ -31,18 +31,63 @@
*
*/
/* I don't like using #ifndef, but I don't see a better way to do this */
#ifndef GRPC_UV
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "grpc/support/log.h"
#include "grpc/support/time.h"
#include "completion_queue_async_worker.h"
#include "completion_queue.h"
#include "call.h"
namespace grpc {
namespace node {
namespace {
/* A worker that asynchronously calls completion_queue_next, and queues onto the
node event loop a call to the function stored in the event's tag. */
class CompletionQueueAsyncWorker : public Nan::AsyncWorker {
public:
CompletionQueueAsyncWorker();
~CompletionQueueAsyncWorker();
/* Calls completion_queue_next with the provided deadline, and stores the
event if there was one or sets an error message if there was not */
void Execute();
/* Returns the completion queue attached to this class */
static grpc_completion_queue *GetQueue();
/* Convenience function to create a worker with the given arguments and queue
it to run asynchronously */
static void Next();
/* Initialize the CompletionQueueAsyncWorker class */
static void Init(v8::Local<v8::Object> exports);
protected:
/* Called when Execute has succeeded (completed without setting an error
message). Calls the saved callback with the event that came from
completion_queue_next */
void HandleOKCallback();
void HandleErrorCallback();
private:
grpc_event result;
static grpc_completion_queue *queue;
// Number of grpc_completion_queue_next calls in the thread pool
static int current_threads;
// Number of grpc_completion_queue_next calls waiting to enter the thread pool
static int waiting_next_calls;
};
const int max_queue_threads = 2;
using v8::Function;
@ -137,5 +182,21 @@ void CompletionQueueAsyncWorker::HandleErrorCallback() {
DestroyTag(result.tag);
}
} // namespace
grpc_completion_queue *GetCompletionQueue() {
return CompletionQueueAsyncWorker::GetQueue();
}
void CompletionQueueNext() {
CompletionQueueAsyncWorker::Next();
}
void CompletionQueueInit(Local<Object> exports) {
CompletionQueueAsyncWorker::Init(exports);
}
} // namespace node
} // namespace grpc
#endif /* GRPC_UV */

@ -31,6 +31,8 @@
*
*/
#ifdef GRPC_UV
#include <uv.h>
#include <node.h>
#include <v8.h>
@ -38,7 +40,6 @@
#include "call.h"
#include "completion_queue.h"
#include "completion_queue_async_worker.h"
namespace grpc {
namespace node {
@ -81,34 +82,24 @@ void drain_completion_queue(uv_prepare_t *handle) {
}
grpc_completion_queue *GetCompletionQueue() {
#ifdef GRPC_UV
return queue;
#else
return CompletionQueueAsyncWorker::GetQueue();
#endif
}
void CompletionQueueNext() {
#ifdef GRPC_UV
if (pending_batches == 0) {
GPR_ASSERT(!uv_is_active((uv_handle_t *)&prepare));
uv_prepare_start(&prepare, drain_completion_queue);
}
pending_batches++;
#else
CompletionQueueAsyncWorker::Next();
#endif
}
void CompletionQueueInit(Local<Object> exports) {
#ifdef GRPC_UV
queue = grpc_completion_queue_create(NULL);
uv_prepare_init(uv_default_loop(), &prepare);
pending_batches = 0;
#else
CompletionQueueAsyncWorker::Init(exports);
#endif
}
} // namespace node
} // namespace grpc
#endif /* GRPC_UV */
Loading…
Cancel
Save