diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index d2e930bc0ff..85dcb3cd079 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -31,24 +31,26 @@ * */ +#include #include #include #include "grpc/support/log.h" #include "grpc/grpc.h" +#include "grpc/support/alloc.h" #include "grpc/support/time.h" #include "byte_buffer.h" #include "call.h" #include "channel.h" #include "completion_queue_async_worker.h" #include "timeval.h" -#include "tag.h" namespace grpc { namespace node { using ::node::Buffer; +using std::unique_ptr; using v8::Arguments; using v8::Array; using v8::Exception; @@ -70,9 +72,11 @@ using v8::Value; Persistent Call::constructor; Persistent Call::fun_tpl; -bool CreateMetadataArray(Handle metadata, grpc_metadata_array *array - vector *string_handles, - vector*> *handles) { + +bool CreateMetadataArray( + Handle metadata, grpc_metadata_array *array, + std::vector > *string_handles, + std::vector > *handles) { NanScope(); Handle keys(metadata->GetOwnPropertyNames()); for (unsigned int i = 0; i < keys->Length(); i++) { @@ -82,27 +86,27 @@ bool CreateMetadataArray(Handle metadata, grpc_metadata_array *array } array->capacity += Local::Cast(metadata->Get(current_key))->Length(); } - array->metadata = calloc(array->capacity, sizeof(grpc_metadata)); + array->metadata = reinterpret_cast( + gpr_malloc(array->capacity * sizeof(grpc_metadata))); for (unsigned int i = 0; i < keys->Length(); i++) { Handle current_key(keys->Get(i)->ToString()); NanUtf8String *utf8_key = new NanUtf8String(current_key); - string_handles->push_back(utf8_key); + string_handles->push_back(unique_ptr values = Local::Cast(metadata->Get(current_key)); for (unsigned int j = 0; j < values->Length(); j++) { Handle value = values->Get(j); - grpc_metadata *current = &array[array->count]; - grpc_call_error error; + grpc_metadata *current = &array->metadata[array->count]; current->key = **utf8_key; if (Buffer::HasInstance(value)) { current->value = Buffer::Data(value); current->value_length = Buffer::Length(value); - Persistent *handle = new Persistent(); - NanAssignPersistent(handle, object); - handles->push_back(handle); + Persistent handle; + NanAssignPersistent(handle, value); + handles->push_back(PersistentHolder(handle)); } else if (value->IsString()) { Handle string_value = value->ToString(); NanUtf8String *utf8_value = new NanUtf8String(string_value); - string_handles->push_back(utf8_value); + string_handles->push_back(unique_ptr(utf8_value)); current->value = **utf8_value; current->value_length = string_value->Length(); } else { @@ -114,6 +118,294 @@ bool CreateMetadataArray(Handle metadata, grpc_metadata_array *array return true; } +Handle ParseMetadata(grpc_metadata_array *metadata_array) { + NanEscapableScope(); + grpc_metadata *metadata_elements = metadata_array->metadata; + size_t length = metadata_array->count; + std::map size_map; + std::map index_map; + + for (unsigned int i = 0; i < length; i++) { + char *key = metadata_elements[i].key; + if (size_map.count(key)) { + size_map[key] += 1; + } + index_map[key] = 0; + } + Handle metadata_object = NanNew(); + for (unsigned int i = 0; i < length; i++) { + grpc_metadata* elem = &metadata_elements[i]; + Handle key_string = String::New(elem->key); + Handle array; + if (metadata_object->Has(key_string)) { + array = Handle::Cast(metadata_object->Get(key_string)); + } else { + array = NanNew(size_map[elem->key]); + metadata_object->Set(key_string, array); + } + array->Set(index_map[elem->key], + MakeFastBuffer( + NanNewBufferHandle(elem->value, elem->value_length))); + index_map[elem->key] += 1; + } + return NanEscapeScope(metadata_object); +} + +class Op { + public: + Handle GetOpType() const { + NanEscapableScope(); + return NanEscapeScope(NanNew(GetTypeString())); + } +}; + +class SendMetadataOp : public Op { + public: + Handle GetNodeValue() { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + if (!value->IsObject()) { + return false; + } + grpc_metadata_array array; + if (!CreateMetadataArray(value->ToObject(), &array, strings, handles)) { + return false; + } + out->data.send_initial_metadata.count = array.count; + out->data.send_initial_metadata.metadata = array.metadata; + return true; + } + protected: + char *GetTypeString() { + return "send metadata"; + } +}; + +class SendMessageOp : public Op { + public: + Handle GetNodeValue() { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + if (!Buffer::HasInstance(value)) { + return false; + } + out->.data.send_message = BufferToByteBuffer(obj->Get(type)); + NanAssignPersistent(handle, value); + handles->push_back(PersistentHolder(handle)); + } + protected: + char *GetTypeString() { + return "send message"; + } +}; + +class SendClientCloseOp : public Op { + public: + Handle GetNodeValue() { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + return true; + } + protected: + char *GetTypeString() { + return "client close"; + } +}; + +class SendServerStatusOp : public Op { + public: + Handle GetNodeValue() { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + if (value->IsObject()) { + return false; + } + Handle server_status = value->ToObject(); + if (!server_status->Get(NanNew("metadata"))->IsObject()) { + return false; + } + if (!server_status->Get(NanNew("code"))->IsUint32()) { + return false; + } + if (!server_status->Get(NanNew("details"))->IsString()) { + return false; + } + grpc_metadata_array array; + if (!CreateMetadataArray(server_status->Get(NanNew("metadata"))-> + ToObject(), + &array, strings, handles)) { + return false; + } + out->data.send_status_from_server.trailing_metadata_count = array.count; + out->data.send_status_from_server.trailing_metadata = array.metadata; + out->data.send_status_from_server.status = + static_cast( + server_status->Get(NanNew("code"))->Uint32Value()); + NanUtf8String *str = new NanUtf8String( + server_status->Get(NanNew("details"))); + strings->push_back(unique_ptr(str)); + out->data.send_status_from_server.status_details = **str; + return true; + } + protected: + char *GetTypeString() { + return "send status"; + } +} + +class GetMetadataOp : public Op { + public: + GetMetadataOp() { + grpc_metadata_array_init(&recv_metadata); + } + + ~GetMetadataOp() { + grpc_metadata_array_destroy(&recv_metadata); + } + + Handle GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ParseMetadata(&recv_metadata)); + } + + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + out->data.recv_initial_metadata = &recv_metadata; + } + + protected: + char *GetTypeString() { + return "metadata"; + } + + private: + grpc_metadata_array recv_metadata; +}; + +class ReadMessageOp : public Op { + public: + ReadMessageOp() { + recv_message = NULL; + } + ~ReadMessageOp() { + if (recv_message != NULL) { + gpr_free(recv_message); + } + } + Handle GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ByteBufferToBuffer(*recv_message)); + } + + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + out->data.recv_message = &recv_message; + } + + protected: + char *GetTypeString() { + return "read"; + } + + private: + grpc_byte_buffer *recv_message; +}; + +class ClientStatusOp : public Op { + public: + ClientStatusOp() { + grpc_metadata_array_init(&metadata); + status_details = NULL; + } + + ~ClientStatusOp() { + gprc_metadata_array_destroy(&metadata_array); + gpr_free(status_details); + } + + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + out->data.recv_status_on_client.trailing_metadata = &metadata_array; + out->data.recv_status_on_client.status = &status; + out->data.recv_status_on_client.status_details = &status_details; + out->data.recv_status_on_client.status_details_capacity = &details_capacity; + } + + Handle GetNodeValue() const { + NanEscapableScope(); + Handle status_obj = NanNew(); + status_obj->Set(NanNew("code"), NanNew(status)); + if (event->data.finished.details != NULL) { + status_obj->Set(NanNew("details"), String::New(status_details)); + } + status_obj->Set(NanNew("metadata"), ParseMetadata(&metadata_array)); + return NanEscapeScope(status_obj); + } + private: + grpc_metadata_array metadata_array; + grpc_status_code status; + char *status_details; + size_t details_capacity; +}; + +class ServerCloseResponseOp : public Op { + public: + Handle GetNodeValue() const { + NanEscapableScope(); + NanEscapeScope(NanNew(cancelled)); + } + + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + out->data.recv_close_on_server.cancelled = &cancelled; + } + + private: + int cancelled; +}; + +struct tag { + tag(NanCallback *callback, std::vector > *ops, + std::vector > *handles, + std::vector > *strings) : + callback(callback), ops(ops), handles(handles), strings(strings){ + } + ~tag() { + if (strings != null) { + for (std::vector::iterator it = strings.begin(); + it != strings.end(); ++it) { + delete *it; + } + delete strings; + } + delete callback; + delete ops; + if (handles != null) { + delete handles; + } + } +}; + Call::Call(grpc_call *call) : wrapped_call(call) {} Call::~Call() { grpc_call_destroy(wrapped_call); } @@ -123,28 +415,10 @@ void Call::Init(Handle exports) { Local tpl = FunctionTemplate::New(New); tpl->SetClassName(NanNew("Call")); tpl->InstanceTemplate()->SetInternalFieldCount(1); - NanSetPrototypeTemplate(tpl, "addMetadata", - FunctionTemplate::New(AddMetadata)->GetFunction()); - NanSetPrototypeTemplate(tpl, "invoke", - FunctionTemplate::New(Invoke)->GetFunction()); - NanSetPrototypeTemplate(tpl, "serverAccept", - FunctionTemplate::New(ServerAccept)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "serverEndInitialMetadata", - FunctionTemplate::New(ServerEndInitialMetadata)->GetFunction()); + NanSetPrototypeTemplate(tpl, "startBatch", + FunctionTemplate::New(StartBatch)->GetFunction()); NanSetPrototypeTemplate(tpl, "cancel", FunctionTemplate::New(Cancel)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startWrite", - FunctionTemplate::New(StartWrite)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "startWriteStatus", - FunctionTemplate::New(StartWriteStatus)->GetFunction()); - NanSetPrototypeTemplate(tpl, "writesDone", - FunctionTemplate::New(WritesDone)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startReadMetadata", - FunctionTemplate::New(WritesDone)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startRead", - FunctionTemplate::New(StartRead)->GetFunction()); NanAssignPersistent(fun_tpl, tpl); NanAssignPersistent(constructor, tpl->GetFunction()); constructor->Set(NanNew("WRITE_BUFFER_HINT"), @@ -225,211 +499,64 @@ NAN_METHOD(Call::StartBatch) { if (!args[1]->IsFunction()) { return NanThrowError("startBatch's second argument must be a callback"); } - vector *> *handles = new vector>(); - vector *strings = new vector(); - Persistent *handle; - Handle keys = args[0]->GetOwnPropertyNames(); + Call *call = ObjectWrap::Unwrap(args.This()); + std::vector > *handles = + new std::vector >(); + std::vector > *strings = + new std::vector >(); + Persistent handle; + Handle obj = args[0]->ToObject(); + Handle keys = obj->GetOwnPropertyNames(); size_t nops = keys->Length(); - grpc_op *ops = calloc(nops, sizeof(grpc_op)); - grpc_metadata_array array; - Handle server_status; - NanUtf8String *str; + grpc_op *ops = new grpc_op[nops]; + std::vector > *op_vector = new std::vector >(); for (unsigned int i = 0; i < nops; i++) { - if (!keys->Get(i)->IsUInt32()) { + Op *op; + if (!keys->Get(i)->IsUint32()) { return NanThrowError( "startBatch's first argument's keys must be integers"); } - uint32_t type = keys->Get(i)->UInt32Value(); - ops[i].op = type; + uint32_t type = keys->Get(i)->Uint32Value(); + ops[i].op = static_cast(type); switch (type) { case GRPC_OP_SEND_INITIAL_METADATA: - if (!args[0]->Get(type)->IsObject()) { - return NanThrowError("metadata must be an object"); - } - if (!CreateMetadataArray(args[0]->Get(type)->ToObject(), &array, - strings, handles)) { - return NanThrowError("failed to parse metadata"); - } - ops[i].data.send_initial_metadata.count = array.count; - ops[i].data.send_initial_metadata.metadata = array.metadata; - break + op = new SendMetadataOp(); + break; case GRPC_OP_SEND_MESSAGE: - if (!Buffer::HasInstance(args[0]->Get(type))) { - return NanThrowError("message must be a Buffer"); - } - ops[i].data.send_message = BufferToByteBuffer(args[0]->Get(type)); - handle = new Persistent(); - NanAssignPersistent(*handle, args[0]->Get(type)); - handles->push_back(handle); + op = new SendMessageOp(); break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + op = new SendClientCloseOp(); break; case GRPC_OP_SEND_STATUS_FROM_SERVER: - if (!args[0]->Get(type)->IsObject()) { - return NanThrowError("server status must be an object"); - } - server_status = args[0]->Get(type)->ToObject(); - if (!server_status->Get("metadata")->IsObject()) { - return NanThrowError("status metadata must be an object"); - } - if (!server_status->Get("code")->IsUInt32()) { - return NanThrowError("status code must be a positive integer"); - } - if (!server_status->Get("details")->IsString()) { - return NanThrowError("status details must be a string"); - } - if (!CreateMetadataArray(server_status->Get("metadata")->ToObject(), - &array, strings, handles)) { - return NanThrowError("Failed to parse status metadata"); - } - ops[i].data.send_status_from_server.trailing_metadata_count = - array.count; - ops[i].data.send_status_from_server.trailing_metadata = array.metadata; - ops[i].data.send_status_from_server.status = - server_status->Get("code")->UInt32Value(); - str = new NanUtf8String(server_status->Get("details")); - strings->push_back(str); - ops[i].data.send_status_from_server.status_details = **str; + op = new SendServerStatusOp(); break; case GRPC_OP_RECV_INITIAL_METADATA: - ops[i].data.recv_initial_metadata = malloc(sizeof(grpc_metadata_array)); - grpc_metadata_array_init(ops[i].data.recv_initial_metadata); + op = new GetMetadataOp(); break; case GRPC_OP_RECV_MESSAGE: - ops[i].data.recv_message = malloc(sizeof(grpc_byte_buffer*)); + op = new ReadMessageOp(); break; case GRPC_OP_RECV_STATUS_ON_CLIENT: - ops[i].data.recv_status_on_client.trailing_metadata = - malloc(sizeof(grpc_metadata_array)); - grpc_metadata_array_init(ops[i].data.recv_status_on_client); - ops[i].data.recv_status_on_client.status = - malloc(sizeof(grpc_status_code)); - ops[i].data.recv_status_on_client.status_details = - malloc(sizeof(char *)); - ops[i].data.recv_status_on_client.status_details_capacity = - malloc(sizeof(size_t)); + op = new ClientStatusOp(); break; case GRPC_OP_RECV_CLOSE_ON_SERVER: - ops[i].data.recv_close_on_server = malloc(sizeof(int)); + op = new ServerCloseResponseOp(); break; - + default: + return NanThrowError("Argument object had an unrecognized key"); } + op.ParseOp(obj.get(type), &ops[i], strings, handles); + op_vector.push_back(unique_ptr(op)); } -} - -NAN_METHOD(Call::AddMetadata) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("addMetadata can only be called on Call objects"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - if (!args[0]->IsObject()) { - return NanThrowTypeError("addMetadata's first argument must be an object"); - } - Handle metadata = args[0]->ToObject(); - Handle keys(metadata->GetOwnPropertyNames()); - for (unsigned int i = 0; i < keys->Length(); i++) { - Handle current_key(keys->Get(i)->ToString()); - if (!metadata->Get(current_key)->IsArray()) { - return NanThrowTypeError( - "addMetadata's first argument's values must be arrays"); - } - NanUtf8String utf8_key(current_key); - Handle values = Local::Cast(metadata->Get(current_key)); - for (unsigned int j = 0; j < values->Length(); j++) { - Handle value = values->Get(j); - grpc_metadata metadata; - grpc_call_error error; - metadata.key = *utf8_key; - if (Buffer::HasInstance(value)) { - metadata.value = Buffer::Data(value); - metadata.value_length = Buffer::Length(value); - error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0); - } else if (value->IsString()) { - Handle string_value = value->ToString(); - NanUtf8String utf8_value(string_value); - metadata.value = *utf8_value; - metadata.value_length = string_value->Length(); - gpr_log(GPR_DEBUG, "adding metadata: %s, %s, %d", metadata.key, - metadata.value, metadata.value_length); - error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0); - } else { - return NanThrowTypeError( - "addMetadata values must be strings or buffers"); - } - if (error != GRPC_CALL_OK) { - return NanThrowError("addMetadata failed", error); - } - } - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::Invoke) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("invoke can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("invoke's first argument must be a function"); - } - if (!args[1]->IsFunction()) { - return NanThrowTypeError("invoke's second argument must be a function"); - } - if (!args[2]->IsUint32()) { - return NanThrowTypeError("invoke's third argument must be integer flags"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - unsigned int flags = args[3]->Uint32Value(); - grpc_call_error error = grpc_call_invoke_old( - call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), - CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("invoke failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::ServerAccept) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("accept can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("accept's first argument must be a function"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - grpc_call_error error = grpc_call_server_accept_old( - call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), - CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("serverAccept failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::ServerEndInitialMetadata) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError( - "serverEndInitialMetadata can only be called on Call objects"); - } - if (!args[0]->IsUint32()) { - return NanThrowTypeError( - "serverEndInitialMetadata's second argument must be integer flags"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - unsigned int flags = args[1]->Uint32Value(); - grpc_call_error error = - grpc_call_server_end_initial_metadata_old(call->wrapped_call, flags); + grpc_call_error error = grpc_call_start_batch( + call->wrapped_call, ops, nops, new struct tag(args[1].As(), + op_vector, nops, handles, + strings)); if (error != GRPC_CALL_OK) { - return NanThrowError("serverEndInitialMetadata failed", error); + return NanThrowError("startBatch failed", error); } + CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); } @@ -446,102 +573,5 @@ NAN_METHOD(Call::Cancel) { NanReturnUndefined(); } -NAN_METHOD(Call::StartWrite) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("startWrite can only be called on Call objects"); - } - if (!Buffer::HasInstance(args[0])) { - return NanThrowTypeError("startWrite's first argument must be a Buffer"); - } - if (!args[1]->IsFunction()) { - return NanThrowTypeError("startWrite's second argument must be a function"); - } - if (!args[2]->IsUint32()) { - return NanThrowTypeError( - "startWrite's third argument must be integer flags"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - grpc_byte_buffer *buffer = BufferToByteBuffer(args[0]); - unsigned int flags = args[2]->Uint32Value(); - grpc_call_error error = grpc_call_start_write_old( - call->wrapped_call, buffer, CreateTag(args[1], args.This()), flags); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startWrite failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::StartWriteStatus) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError( - "startWriteStatus can only be called on Call objects"); - } - if (!args[0]->IsUint32()) { - return NanThrowTypeError( - "startWriteStatus's first argument must be a status code"); - } - if (!args[1]->IsString()) { - return NanThrowTypeError( - "startWriteStatus's second argument must be a string"); - } - if (!args[2]->IsFunction()) { - return NanThrowTypeError( - "startWriteStatus's third argument must be a function"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - NanUtf8String details(args[1]); - grpc_call_error error = grpc_call_start_write_status_old( - call->wrapped_call, (grpc_status_code)args[0]->Uint32Value(), *details, - CreateTag(args[2], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startWriteStatus failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::WritesDone) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("writesDone can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("writesDone's first argument must be a function"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - grpc_call_error error = grpc_call_writes_done_old( - call->wrapped_call, CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("writesDone failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::StartRead) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("startRead can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("startRead's first argument must be a function"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - grpc_call_error error = grpc_call_start_read_old( - call->wrapped_call, CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startRead failed", error); - } - NanReturnUndefined(); -} - } // namespace node } // namespace grpc diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 1924a1bf428..6ae370d02f1 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -34,6 +34,8 @@ #ifndef NET_GRPC_NODE_CALL_H_ #define NET_GRPC_NODE_CALL_H_ +#include + #include #include #include "grpc/grpc.h" @@ -43,6 +45,44 @@ namespace grpc { namespace node { +using std::unique_ptr; + +class PersistentHolder { + public: + explicit PersistentHolder(v8::Persistent persist) : persist(persist) { + } + + ~PersistentHolder() { + persist.Dispose(); + } + + private: + v8::Persistent persist; +}; + +class Op { + public: + virtual Handle GetNodeValue() const = 0; + virtual bool ParseOp(v8::Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) = 0; + Handle GetOpType(); + + protected: + virtual char *GetTypeString(); +}; + +struct tag { + tag(NanCallback *callback, std::vector > *ops, + std::vector > *handles, + std::vector > *strings); + ~tag(); + NanCallback *callback; + std::vector > *ops; + std::vector > *handles; + std::vector > *strings; +}; + /* Wrapper class for grpc_call structs. */ class Call : public ::node::ObjectWrap { public: @@ -60,15 +100,8 @@ class Call : public ::node::ObjectWrap { Call &operator=(const Call &); static NAN_METHOD(New); - static NAN_METHOD(AddMetadata); - static NAN_METHOD(Invoke); - static NAN_METHOD(ServerAccept); - static NAN_METHOD(ServerEndInitialMetadata); + static NAN_METHOD(StartBatch); static NAN_METHOD(Cancel); - static NAN_METHOD(StartWrite); - static NAN_METHOD(StartWriteStatus); - static NAN_METHOD(WritesDone); - static NAN_METHOD(StartRead); static v8::Persistent constructor; // Used for typechecking instances of this javascript class static v8::Persistent fun_tpl; diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index 8de7db66d50..bb0e39180e8 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -37,7 +37,6 @@ #include "grpc/grpc.h" #include "grpc/support/time.h" #include "completion_queue_async_worker.h" -#include "event.h" #include "tag.h" namespace grpc { @@ -58,6 +57,9 @@ CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} void CompletionQueueAsyncWorker::Execute() { result = grpc_completion_queue_next(queue, gpr_inf_future); + if (result->data.op_complete != GRPC_OP_OK) { + SetErrorMessage("The batch encountered an error"); + } } grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } @@ -75,14 +77,26 @@ void CompletionQueueAsyncWorker::Init(Handle exports) { void CompletionQueueAsyncWorker::HandleOKCallback() { NanScope(); - NanCallback event_callback(GetTagHandle(result->tag).As()); - Handle argv[] = {CreateEventObject(result)}; + NanCallback callback = GetTagCallback(result->tag); + Handle argv[] = {NanNull(), GetNodeValue(result->tag)}; DestroyTag(result->tag); grpc_event_finish(result); result = NULL; - event_callback.Call(1, argv); + callback.Call(2, argv); +} + +void CompletionQueueAsyncWorker::HandleErrorCallback() { + NanScope(); + NanCallback callback = GetTagCallback(result->tag); + Handle argv[] = {NanError(ErrorMessage())}; + + DestroyTag(result->tag); + grpc_event_finish(result); + result = NULL; + + callback.Call(1, argv); } } // namespace node diff --git a/src/node/ext/completion_queue_async_worker.h b/src/node/ext/completion_queue_async_worker.h index 2c928b7024f..c04a303283a 100644 --- a/src/node/ext/completion_queue_async_worker.h +++ b/src/node/ext/completion_queue_async_worker.h @@ -67,6 +67,8 @@ class CompletionQueueAsyncWorker : public NanAsyncWorker { completion_queue_next */ void HandleOKCallback(); + void HandleErrorCallback(); + private: grpc_event *result; diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 6b8ccef9b17..c0ccf1f381c 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -31,6 +31,8 @@ * */ +#include + #include "server.h" #include @@ -49,6 +51,7 @@ namespace grpc { namespace node { +using std::unique_ptr; using v8::Arguments; using v8::Array; using v8::Boolean; @@ -67,6 +70,45 @@ using v8::Value; Persistent Server::constructor; Persistent Server::fun_tpl; +class NewCallOp : public Op { + public: + NewCallOp() { + call = NULL; + grpc_call_details_init(&details); + grpc_metadata_array_init(&request_metadata); + } + + ~NewCallOp() { + grpc_call_details_destroy(&details); + grpc_metadata_array_destroy(&details); + } + + Handle GetNodeValue() const { + NanEscapableScope(); + if (*call == NULL) { + return NanEscapeScope(NanNull()); + } + Handle obj = NanNew(); + obj->Set(NanNew("call"), Call::WrapStruct(call)); + obj->Set(NanNew("method"), NanNew(details.method)); + obj->Set(NanNew("host"), NanNew(details.host)); + obj->Set(NanNew("deadline"), + NanNew(TimespecToMilliseconds(details.deadline))); + obj->Set(NanNew("metadata"), ParseMetadata(&request_metadata)); + return NanEscapeScope(obj); + } + + bool ParseOp(Handle value, grpc_op *out, + std::vector > strings, + std::vector > handles) { + return true; + } + + grpc_call *call; + grpc_call_details details; + grpc_metadata_array request_metadata; +} + Server::Server(grpc_server *server) : wrapped_server(server) {} Server::~Server() { grpc_server_destroy(wrapped_server); } @@ -175,13 +217,16 @@ NAN_METHOD(Server::RequestCall) { return NanThrowTypeError("requestCall can only be called on a Server"); } Server *server = ObjectWrap::Unwrap(args.This()); - grpc_call_error error = grpc_server_request_call_old( - server->wrapped_server, CreateTag(args[0], NanNull())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { + Op *op = new NewCallOp(); + std::vector > *ops = { unique_ptr(op) }; + grpc_call_error error = grpc_server_request_call( + server->wrapped_server, &op->call, &op->details, &op->metadata, + CompletionQueueAsyncWorker::GetQueue(), + new struct tag(args[0].As(), ops, NULL, NULL)); + if (error != GRPC_CALL_OK) { return NanThrowError("requestCall failed", error); } + CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); } diff --git a/src/node/ext/tag.cc b/src/node/ext/tag.cc index 4c41c3d20c6..27baa94a8ea 100644 --- a/src/node/ext/tag.cc +++ b/src/node/ext/tag.cc @@ -89,6 +89,7 @@ class OpResponse { explicit OpResponse(char *name): name(name) { } virtual Handle GetNodeValue() const = 0; + virtual bool ParseOp() = 0; Handle GetOpType() const { NanEscapableScope(); return NanEscapeScope(NanNew(name)); @@ -136,16 +137,23 @@ class MessageResponse : public OpResponse { } private: - grpc_byte_buffer **recv_message + grpc_byte_buffer **recv_message; }; +switch () { +case GRPC_RECV_CLIENT_STATUS: + op = new ClientStatusResponse; + break; +} + + class ClientStatusResponse : public OpResponse { public: - explicit ClientStatusResponse(grpc_metadata_array *metadata_array, - grpc_status_code *status, - char **status_details): - metadata_array(metadata_array), status(status), - status_details(status_details), OpResponse("status") { + explicit ClientStatusResponse(): + OpResponse("status") { + } + + bool ParseOp(Handle obj, grpc_op *out) { } Handle GetNodeValue() const { @@ -159,9 +167,9 @@ class ClientStatusResponse : public OpResponse { return NanEscapeScope(status_obj); } private: - grpc_metadata_array *metadata_array; - grpc_status_code *status; - char **status_details; + grpc_metadata_array metadata_array; + grpc_status_code status; + char *status_details; }; class ServerCloseResponse : public OpResponse { @@ -208,22 +216,35 @@ class NewCallResponse : public OpResponse { } struct tag { - tag(NanCallback *callback, std::vector *responses) : - callback(callback), repsonses(responses) { + tag(NanCallback *callback, std::vector *responses, + std::vector> *handles, + std::vector *strings) : + callback(callback), repsonses(responses), handles(handles), + strings(strings){ } ~tag() { for (std::vector::iterator it = responses->begin(); it != responses->end(); ++it) { delete *it; } + for (std::vector::iterator it = responses->begin(); + it != responses->end(); ++it) { + delete *it; + } delete callback; delete responses; + delete handles; + delete strings; } NanCallback *callback; std::vector *responses; + std::vector> *handles; + std::vector *strings; }; -void *CreateTag(Handle callback, grpc_op *ops, size_t nops) { +void *CreateTag(Handle callback, grpc_op *ops, size_t nops, + std::vector> *handles, + std::vector *strings) { NanScope(); NanCallback *cb = new NanCallback(callback); vector *responses = new vector(); @@ -264,7 +285,7 @@ void *CreateTag(Handle callback, grpc_op *ops, size_t nops) { } responses->push_back(resp); } - struct tag *tag_struct = new struct tag(cb, responses); + struct tag *tag_struct = new struct tag(cb, responses, handles, strings); return reinterpret_cast(tag_struct); } @@ -280,7 +301,7 @@ void *CreateTag(Handle callback, grpc_call **call, return reinterpret_cast(tag_struct); } -NanCallback GetCallback(void *tag) { +NanCallback GetTagCallback(void *tag) { NanEscapableScope(); struct tag *tag_struct = reinterpret_cast(tag); return NanEscapeScope(*tag_struct->callback); diff --git a/src/node/ext/tag.h b/src/node/ext/tag.h index 5c709743237..9ff8703b959 100644 --- a/src/node/ext/tag.h +++ b/src/node/ext/tag.h @@ -34,6 +34,9 @@ #ifndef NET_GRPC_NODE_TAG_H_ #define NET_GRPC_NODE_TAG_H_ +#include + + #include #include #include @@ -41,9 +44,11 @@ namespace grpc { namespace node { -/* Create a void* tag that can be passed to grpc_call_start_batch from a callback - function and an ops array */ -void *CreateTag(v8::Handle callback, grpc_op *ops, size_t nops); +/* Create a void* tag that can be passed to grpc_call_start_batch from a + callback function and an ops array */ +void *CreateTag(v8::Handle callback, grpc_op *ops, size_t nops, + std::vector > *handles, + std::vector *strings); /* Create a void* tag that can be passed to grpc_server_request_call from a callback and the various out parameters to that function */ @@ -55,7 +60,7 @@ void *CreateTag(v8::Handle callback, grpc_call **call, NanCallback GetCallback(void *tag); /* Get the combined output value from the tag */ -v8::Handle GetNodevalue(void *tag); +v8::Handle GetNodeValue(void *tag); /* Destroy the tag and all resources it is holding. It is illegal to call any of these other functions on a tag after it has been destroyed. */