diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index bf2cd946a5c..efc611b8b78 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -53,6 +53,9 @@ using v8::Value; grpc_completion_queue *CompletionQueueAsyncWorker::queue; +// Invariants: current_threads <= max_queue_threads +// (current_threads == max_queue_threads) || (waiting_next_calls == 0) + int CompletionQueueAsyncWorker::current_threads; int CompletionQueueAsyncWorker::waiting_next_calls; @@ -74,11 +77,15 @@ grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } void CompletionQueueAsyncWorker::Next() { NanScope(); if (current_threads < max_queue_threads) { + current_threads += 1; CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); NanAsyncQueueWorker(worker); } else { waiting_next_calls += 1; } + GPR_ASSERT(current_threads <= max_queue_threads); + GPR_ASSERT((current_threads == max_queue_threads) || + (waiting_next_calls == 0)); } void CompletionQueueAsyncWorker::Init(Handle exports) { @@ -92,11 +99,15 @@ void CompletionQueueAsyncWorker::HandleOKCallback() { NanScope(); if (waiting_next_calls > 0) { waiting_next_calls -= 1; + // Old worker removed, new worker added. current_threads += 0 CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); NanAsyncQueueWorker(worker); } else { current_threads -= 1; } + GPR_ASSERT(current_threads <= max_queue_threads); + GPR_ASSERT((current_threads == max_queue_threads) || + (waiting_next_calls == 0)); NanCallback *callback = GetTagCallback(result.tag); Handle argv[] = {NanNull(), GetTagNodeValue(result.tag)}; callback->Call(2, argv); @@ -106,6 +117,17 @@ void CompletionQueueAsyncWorker::HandleOKCallback() { void CompletionQueueAsyncWorker::HandleErrorCallback() { NanScope(); + if (waiting_next_calls > 0) { + waiting_next_calls -= 1; + // Old worker removed, new worker added. current_threads += 0 + CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); + NanAsyncQueueWorker(worker); + } else { + current_threads -= 1; + } + GPR_ASSERT(current_threads <= max_queue_threads); + GPR_ASSERT((current_threads == max_queue_threads) || + (waiting_next_calls == 0)); NanCallback *callback = GetTagCallback(result.tag); Handle argv[] = {NanError(ErrorMessage())};