|
|
|
@ -78,6 +78,8 @@ class CompletionQueueAsyncWorker : public Nan::AsyncWorker { |
|
|
|
|
void HandleErrorCallback(); |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
static void TryAddWorker(); |
|
|
|
|
|
|
|
|
|
grpc_event result; |
|
|
|
|
|
|
|
|
|
static grpc_completion_queue *queue; |
|
|
|
@ -118,20 +120,21 @@ void CompletionQueueAsyncWorker::Execute() { |
|
|
|
|
|
|
|
|
|
grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } |
|
|
|
|
|
|
|
|
|
void CompletionQueueAsyncWorker::Next() { |
|
|
|
|
#ifndef GRPC_UV |
|
|
|
|
Nan::HandleScope scope; |
|
|
|
|
if (current_threads < max_queue_threads) { |
|
|
|
|
void CompletionQueueAsyncWorker::TryAddWorker() { |
|
|
|
|
if (current_threads < max_queue_threads && waiting_next_calls > 0) { |
|
|
|
|
current_threads += 1; |
|
|
|
|
waiting_next_calls -= 1; |
|
|
|
|
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); |
|
|
|
|
Nan::AsyncQueueWorker(worker); |
|
|
|
|
} else { |
|
|
|
|
waiting_next_calls += 1; |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(current_threads <= max_queue_threads); |
|
|
|
|
GPR_ASSERT((current_threads == max_queue_threads) || |
|
|
|
|
(waiting_next_calls == 0)); |
|
|
|
|
#endif |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void CompletionQueueAsyncWorker::Next() { |
|
|
|
|
waiting_next_calls += 1; |
|
|
|
|
TryAddWorker(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void CompletionQueueAsyncWorker::Init(Local<Object> exports) { |
|
|
|
@ -143,17 +146,8 @@ void CompletionQueueAsyncWorker::Init(Local<Object> exports) { |
|
|
|
|
|
|
|
|
|
void CompletionQueueAsyncWorker::HandleOKCallback() { |
|
|
|
|
Nan::HandleScope scope; |
|
|
|
|
if (waiting_next_calls > 0) { |
|
|
|
|
waiting_next_calls -= 1; |
|
|
|
|
// Old worker removed, new worker added. current_threads += 0
|
|
|
|
|
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); |
|
|
|
|
Nan::AsyncQueueWorker(worker); |
|
|
|
|
} else { |
|
|
|
|
current_threads -= 1; |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(current_threads <= max_queue_threads); |
|
|
|
|
GPR_ASSERT((current_threads == max_queue_threads) || |
|
|
|
|
(waiting_next_calls == 0)); |
|
|
|
|
current_threads -= 1; |
|
|
|
|
TryAddWorker(); |
|
|
|
|
Nan::Callback *callback = GetTagCallback(result.tag); |
|
|
|
|
Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)}; |
|
|
|
|
callback->Call(2, argv); |
|
|
|
@ -162,18 +156,9 @@ void CompletionQueueAsyncWorker::HandleOKCallback() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void CompletionQueueAsyncWorker::HandleErrorCallback() { |
|
|
|
|
if (waiting_next_calls > 0) { |
|
|
|
|
waiting_next_calls -= 1; |
|
|
|
|
// Old worker removed, new worker added. current_threads += 0
|
|
|
|
|
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); |
|
|
|
|
Nan::AsyncQueueWorker(worker); |
|
|
|
|
} else { |
|
|
|
|
current_threads -= 1; |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(current_threads <= max_queue_threads); |
|
|
|
|
GPR_ASSERT((current_threads == max_queue_threads) || |
|
|
|
|
(waiting_next_calls == 0)); |
|
|
|
|
Nan::HandleScope scope; |
|
|
|
|
current_threads -= 1; |
|
|
|
|
TryAddWorker(); |
|
|
|
|
Nan::Callback *callback = GetTagCallback(result.tag); |
|
|
|
|
Local<Value> argv[] = {Nan::Error(ErrorMessage())}; |
|
|
|
|
|
|
|
|
@ -189,6 +174,7 @@ grpc_completion_queue *GetCompletionQueue() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void CompletionQueueNext() { |
|
|
|
|
gpr_log(GPR_DEBUG, "Called CompletionQueueNext"); |
|
|
|
|
CompletionQueueAsyncWorker::Next(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|