Fixed hang when using Node gRPC with other async operations

pull/3434/head
murgatroid99 9 years ago
parent 1965810ede
commit e339f6feec
  1. 22
      src/node/ext/completion_queue_async_worker.cc

@ -53,6 +53,9 @@ using v8::Value;
grpc_completion_queue *CompletionQueueAsyncWorker::queue; grpc_completion_queue *CompletionQueueAsyncWorker::queue;
// Invariants: current_threads <= max_queue_threads
// (current_threads == max_queue_threads) || (waiting_next_calls == 0)
int CompletionQueueAsyncWorker::current_threads; int CompletionQueueAsyncWorker::current_threads;
int CompletionQueueAsyncWorker::waiting_next_calls; int CompletionQueueAsyncWorker::waiting_next_calls;
@ -74,11 +77,15 @@ grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
void CompletionQueueAsyncWorker::Next() { void CompletionQueueAsyncWorker::Next() {
NanScope(); NanScope();
if (current_threads < max_queue_threads) { if (current_threads < max_queue_threads) {
current_threads += 1;
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
NanAsyncQueueWorker(worker); NanAsyncQueueWorker(worker);
} else { } else {
waiting_next_calls += 1; waiting_next_calls += 1;
} }
GPR_ASSERT(current_threads <= max_queue_threads);
GPR_ASSERT((current_threads == max_queue_threads) ||
(waiting_next_calls == 0));
} }
void CompletionQueueAsyncWorker::Init(Handle<Object> exports) { void CompletionQueueAsyncWorker::Init(Handle<Object> exports) {
@ -92,11 +99,15 @@ void CompletionQueueAsyncWorker::HandleOKCallback() {
NanScope(); NanScope();
if (waiting_next_calls > 0) { if (waiting_next_calls > 0) {
waiting_next_calls -= 1; waiting_next_calls -= 1;
// Old worker removed, new worker added. current_threads += 0
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
NanAsyncQueueWorker(worker); NanAsyncQueueWorker(worker);
} else { } else {
current_threads -= 1; current_threads -= 1;
} }
GPR_ASSERT(current_threads <= max_queue_threads);
GPR_ASSERT((current_threads == max_queue_threads) ||
(waiting_next_calls == 0));
NanCallback *callback = GetTagCallback(result.tag); NanCallback *callback = GetTagCallback(result.tag);
Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result.tag)}; Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result.tag)};
callback->Call(2, argv); callback->Call(2, argv);
@ -106,6 +117,17 @@ void CompletionQueueAsyncWorker::HandleOKCallback() {
void CompletionQueueAsyncWorker::HandleErrorCallback() { void CompletionQueueAsyncWorker::HandleErrorCallback() {
NanScope(); NanScope();
if (waiting_next_calls > 0) {
waiting_next_calls -= 1;
// Old worker removed, new worker added. current_threads += 0
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
NanAsyncQueueWorker(worker);
} else {
current_threads -= 1;
}
GPR_ASSERT(current_threads <= max_queue_threads);
GPR_ASSERT((current_threads == max_queue_threads) ||
(waiting_next_calls == 0));
NanCallback *callback = GetTagCallback(result.tag); NanCallback *callback = GetTagCallback(result.tag);
Handle<Value> argv[] = {NanError(ErrorMessage())}; Handle<Value> argv[] = {NanError(ErrorMessage())};

Loading…
Cancel
Save