Refactor tag completion handling into one function

pull/10563/head
murgatroid99 8 years ago
parent d587d33ef6
commit 79962f33c7
  1. 32
      src/node/ext/call.cc
  2. 6
      src/node/ext/call.h
  3. 9
      src/node/ext/completion_queue_threadpool.cc
  4. 12
      src/node/ext/completion_queue_uv.cc

@ -499,25 +499,23 @@ tag::~tag() {
delete ops;
}
Local<Value> GetTagNodeValue(void *tag) {
EscapableHandleScope scope;
void CompleteTag(void *tag, const char *error_message) {
HandleScope scope;
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
Local<Object> tag_obj = Nan::New<Object>();
for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
it != tag_struct->ops->end(); ++it) {
Op *op_ptr = it->get();
Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue());
Callback *callback = tag_struct->callback;
if (error_message == NULL) {
Local<Object> tag_obj = Nan::New<Object>();
for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
it != tag_struct->ops->end(); ++it) {
Op *op_ptr = it->get();
Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue());
}
Local<Value> argv[] = {Nan::Null(), tag_obj};
callback->Call(2, argv);
} else {
Local<Value> argv[] = {Nan::Error(error_message)};
callback->Call(1, argv);
}
return scope.Escape(tag_obj);
}
Callback *GetTagCallback(void *tag) {
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
return tag_struct->callback;
}
void CompleteTag(void *tag) {
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
bool is_final_op = false;
if (tag_struct->call == NULL) {
return;

@ -123,13 +123,9 @@ struct tag {
call_persist;
};
v8::Local<v8::Value> GetTagNodeValue(void *tag);
Nan::Callback *GetTagCallback(void *tag);
void DestroyTag(void *tag);
void CompleteTag(void *tag);
void CompleteTag(void *tag, const char *error_message);
} // namespace node
} // namespace grpc

@ -148,9 +148,7 @@ void CompletionQueueAsyncWorker::HandleOKCallback() {
Nan::HandleScope scope;
current_threads -= 1;
TryAddWorker();
Nan::Callback *callback = GetTagCallback(result.tag);
Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)};
callback->Call(2, argv);
CompleteTag(result.tag, NULL);
DestroyTag(result.tag);
}
@ -159,10 +157,7 @@ void CompletionQueueAsyncWorker::HandleErrorCallback() {
Nan::HandleScope scope;
current_threads -= 1;
TryAddWorker();
Nan::Callback *callback = GetTagCallback(result.tag);
Local<Value> argv[] = {Nan::Error(ErrorMessage())};
callback->Call(1, argv);
CompleteTag(result.tag, ErrorMessage());
DestroyTag(result.tag);
}

@ -61,17 +61,13 @@ void drain_completion_queue(uv_prepare_t *handle) {
queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL);
if (event.type == GRPC_OP_COMPLETE) {
Nan::Callback *callback = grpc::node::GetTagCallback(event.tag);
const char *error_message;
if (event.success) {
Local<Value> argv[] = {Nan::Null(),
grpc::node::GetTagNodeValue(event.tag)};
callback->Call(2, argv);
error_message = NULL;
} else {
Local<Value> argv[] = {Nan::Error(
"The async function encountered an error")};
callback->Call(1, argv);
error_message = "The async function encountered an error";
}
grpc::node::CompleteTag(event.tag);
CompleteTag(event.tag, error_message);
grpc::node::DestroyTag(event.tag);
pending_batches--;
if (pending_batches == 0) {

Loading…
Cancel
Save