From a9b99c93e3ac7557eb0ad52c83fde013518eee92 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 5 Feb 2015 17:42:01 -0800 Subject: [PATCH] Part of the update to the new API --- src/node/ext/call.cc | 155 +++++++++++++++++++++- src/node/ext/node_grpc.cc | 31 +++++ src/node/ext/tag.cc | 263 +++++++++++++++++++++++++++++++++----- src/node/ext/tag.h | 28 ++-- 4 files changed, 434 insertions(+), 43 deletions(-) diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 23aead07b29..d2e930bc0ff 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -31,6 +31,8 @@ * */ +#include + #include #include "grpc/support/log.h" @@ -68,6 +70,50 @@ using v8::Value; Persistent Call::constructor; Persistent Call::fun_tpl; +bool CreateMetadataArray(Handle metadata, grpc_metadata_array *array + vector *string_handles, + vector*> *handles) { + NanScope(); + Handle keys(metadata->GetOwnPropertyNames()); + for (unsigned int i = 0; i < keys->Length(); i++) { + Handle current_key(keys->Get(i)->ToString()); + if (!metadata->Get(current_key)->IsArray()) { + return false; + } + array->capacity += Local::Cast(metadata->Get(current_key))->Length(); + } + array->metadata = calloc(array->capacity, sizeof(grpc_metadata)); + for (unsigned int i = 0; i < keys->Length(); i++) { + Handle current_key(keys->Get(i)->ToString()); + NanUtf8String *utf8_key = new NanUtf8String(current_key); + string_handles->push_back(utf8_key); + Handle values = Local::Cast(metadata->Get(current_key)); + for (unsigned int j = 0; j < values->Length(); j++) { + Handle value = values->Get(j); + grpc_metadata *current = &array[array->count]; + grpc_call_error error; + current->key = **utf8_key; + if (Buffer::HasInstance(value)) { + current->value = Buffer::Data(value); + current->value_length = Buffer::Length(value); + Persistent *handle = new Persistent(); + NanAssignPersistent(handle, object); + handles->push_back(handle); + } else if (value->IsString()) { + Handle string_value = value->ToString(); + NanUtf8String *utf8_value = new NanUtf8String(string_value); + string_handles->push_back(utf8_value); + current->value = **utf8_value; + current->value_length = string_value->Length(); + } else { + return false; + } + array->count += 1; + } + } + return true; +} + Call::Call(grpc_call *call) : wrapped_call(call) {} Call::~Call() { grpc_call_destroy(wrapped_call); } @@ -152,9 +198,9 @@ NAN_METHOD(Call::New) { NanUtf8String method(args[1]); double deadline = args[2]->NumberValue(); grpc_channel *wrapped_channel = channel->GetWrappedChannel(); - grpc_call *wrapped_call = grpc_channel_create_call_old( - wrapped_channel, *method, channel->GetHost(), - MillisecondsToTimespec(deadline)); + grpc_call *wrapped_call = grpc_channel_create_call( + wrapped_channel, CompletionQueueAsyncWorker::GetQueue(), *method, + channel->GetHost(), MillisecondsToTimespec(deadline)); call = new Call(wrapped_call); args.This()->SetHiddenValue(String::NewSymbol("channel_"), channel_object); @@ -168,6 +214,109 @@ NAN_METHOD(Call::New) { } } +NAN_METHOD(Call::StartBatch) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("startBatch can only be called on Call objects"); + } + if (!args[0]->IsObject()) { + return NanThrowError("startBatch's first argument must be an object"); + } + if (!args[1]->IsFunction()) { + return NanThrowError("startBatch's second argument must be a callback"); + } + vector *> *handles = new vector>(); + vector *strings = new vector(); + Persistent *handle; + Handle keys = args[0]->GetOwnPropertyNames(); + size_t nops = keys->Length(); + grpc_op *ops = calloc(nops, sizeof(grpc_op)); + grpc_metadata_array array; + Handle server_status; + NanUtf8String *str; + for (unsigned int i = 0; i < nops; i++) { + if (!keys->Get(i)->IsUInt32()) { + return NanThrowError( + "startBatch's first argument's keys must be integers"); + } + uint32_t type = keys->Get(i)->UInt32Value(); + ops[i].op = type; + switch (type) { + case GRPC_OP_SEND_INITIAL_METADATA: + if (!args[0]->Get(type)->IsObject()) { + return NanThrowError("metadata must be an object"); + } + if (!CreateMetadataArray(args[0]->Get(type)->ToObject(), &array, + strings, handles)) { + return NanThrowError("failed to parse metadata"); + } + ops[i].data.send_initial_metadata.count = array.count; + ops[i].data.send_initial_metadata.metadata = array.metadata; + break + case GRPC_OP_SEND_MESSAGE: + if (!Buffer::HasInstance(args[0]->Get(type))) { + return NanThrowError("message must be a Buffer"); + } + ops[i].data.send_message = BufferToByteBuffer(args[0]->Get(type)); + handle = new Persistent(); + NanAssignPersistent(*handle, args[0]->Get(type)); + handles->push_back(handle); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + if (!args[0]->Get(type)->IsObject()) { + return NanThrowError("server status must be an object"); + } + server_status = args[0]->Get(type)->ToObject(); + if (!server_status->Get("metadata")->IsObject()) { + return NanThrowError("status metadata must be an object"); + } + if (!server_status->Get("code")->IsUInt32()) { + return NanThrowError("status code must be a positive integer"); + } + if (!server_status->Get("details")->IsString()) { + return NanThrowError("status details must be a string"); + } + if (!CreateMetadataArray(server_status->Get("metadata")->ToObject(), + &array, strings, handles)) { + return NanThrowError("Failed to parse status metadata"); + } + ops[i].data.send_status_from_server.trailing_metadata_count = + array.count; + ops[i].data.send_status_from_server.trailing_metadata = array.metadata; + ops[i].data.send_status_from_server.status = + server_status->Get("code")->UInt32Value(); + str = new NanUtf8String(server_status->Get("details")); + strings->push_back(str); + ops[i].data.send_status_from_server.status_details = **str; + break; + case GRPC_OP_RECV_INITIAL_METADATA: + ops[i].data.recv_initial_metadata = malloc(sizeof(grpc_metadata_array)); + grpc_metadata_array_init(ops[i].data.recv_initial_metadata); + break; + case GRPC_OP_RECV_MESSAGE: + ops[i].data.recv_message = malloc(sizeof(grpc_byte_buffer*)); + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + ops[i].data.recv_status_on_client.trailing_metadata = + malloc(sizeof(grpc_metadata_array)); + grpc_metadata_array_init(ops[i].data.recv_status_on_client); + ops[i].data.recv_status_on_client.status = + malloc(sizeof(grpc_status_code)); + ops[i].data.recv_status_on_client.status_details = + malloc(sizeof(char *)); + ops[i].data.recv_status_on_client.status_details_capacity = + malloc(sizeof(size_t)); + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + ops[i].data.recv_close_on_server = malloc(sizeof(int)); + break; + + } + } +} + NAN_METHOD(Call::AddMetadata) { NanScope(); if (!HasInstance(args.This())) { diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index bc1dfaf8996..c9388940ad2 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -161,6 +161,36 @@ void InitCompletionTypeConstants(Handle exports) { completion_type->Set(NanNew("SERVER_RPC_NEW"), SERVER_RPC_NEW); } +void InitOpTypeConstants(Handle exports) { + NanScope(); + Handle op_type = Object::New(); + exports->Set(NanNew("opType"), op_type); + Handle SEND_INITIAL_METADATA( + NanNew(GRPC_OP_SEND_INITIAL_METADATA)); + op_type->Set(NanNew("SEND_INITIAL_METADATA"), SEND_INITIAL_METADATA); + Handle SEND_MESSAGE( + NanNew(GRPC_OP_SEND_MESSAGE)); + op_type->Set(NanNew("SEND_MESSAGE"), SEND_MESSAGE); + Handle SEND_CLOSE_FROM_CLIENT( + NanNew(GRPC_OP_SEND_CLOSE_FROM_CLIENT)); + op_type->Set(NanNew("SEND_CLOSE_FROM_CLIENT"), SEND_CLOSE_FROM_CLIENT); + Handle SEND_STATUS_FROM_SERVER( + NanNew(GRPC_OP_SEND_STATUS_FROM_SERVER)); + op_type->Set(NanNew("SEND_STATUS_FROM_SERVER"), SEND_STATUS_FROM_SERVER); + Handle RECV_INITIAL_METADATA( + NanNew(GRPC_OP_RECV_INITIAL_METADATA)); + op_type->Set(NanNew("RECV_INITIAL_METADATA"), RECV_INITIAL_METADATA); + Handle RECV_MESSAGE( + NanNew(GRPC_OP_RECV_MESSAGE)); + op_type->Set(NanNew("RECV_MESSAGE"), RECV_MESSAGE); + Handle RECV_STATUS_ON_CLIENT( + NanNew(GRPC_OP_RECV_STATUS_ON_CLIENT)); + op_type->Set(NanNew("RECV_STATUS_ON_CLIENT"), RECV_STATUS_ON_CLIENT); + Handle RECV_CLOSE_ON_SERVER( + NanNew(GRPC_OP_RECV_CLOSE_ON_SERVER)); + op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER); +} + void init(Handle exports) { NanScope(); grpc_init(); @@ -168,6 +198,7 @@ void init(Handle exports) { InitCallErrorConstants(exports); InitOpErrorConstants(exports); InitCompletionTypeConstants(exports); + InitOpTypeConstants(exports); grpc::node::Call::Init(exports); grpc::node::Channel::Init(exports); diff --git a/src/node/ext/tag.cc b/src/node/ext/tag.cc index dc8e523e12e..4c41c3d20c6 100644 --- a/src/node/ext/tag.cc +++ b/src/node/ext/tag.cc @@ -31,68 +31,271 @@ * */ +#include +#include + +#include #include #include #include #include "tag.h" +#include "call.h" namespace grpc { namespace node { +using v8::Boolean; +using v8::Function; using v8::Handle; using v8::HandleScope; using v8::Persistent; using v8::Value; -struct tag { - tag(Persistent *tag, Persistent *call) - : persist_tag(tag), persist_call(call) {} +Handle ParseMetadata(grpc_metadata_array *metadata_array) { + NanEscapableScope(); + grpc_metadata *metadata_elements = metadata_array->metadata; + size_t length = metadata_array->count; + std::map size_map; + std::map index_map; + + for (unsigned int i = 0; i < length; i++) { + char *key = metadata_elements[i].key; + if (size_map.count(key)) { + size_map[key] += 1; + } + index_map[key] = 0; + } + Handle metadata_object = NanNew(); + for (unsigned int i = 0; i < length; i++) { + grpc_metadata* elem = &metadata_elements[i]; + Handle key_string = String::New(elem->key); + Handle array; + if (metadata_object->Has(key_string)) { + array = Handle::Cast(metadata_object->Get(key_string)); + } else { + array = NanNew(size_map[elem->key]); + metadata_object->Set(key_string, array); + } + array->Set(index_map[elem->key], + MakeFastBuffer( + NanNewBufferHandle(elem->value, elem->value_length))); + index_map[elem->key] += 1; + } + return NanEscapeScope(metadata_object); +} + +class OpResponse { + public: + explicit OpResponse(char *name): name(name) { + } + virtual Handle GetNodeValue() const = 0; + Handle GetOpType() const { + NanEscapableScope(); + return NanEscapeScope(NanNew(name)); + } + + private: + char *name; +}; + +class SendResponse : public OpResponse { + public: + explicit SendResponse(char *name): OpResponse(name) { + } + + Handle GetNodeValue() { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } +} + +class MetadataResponse : public OpResponse { + public: + explicit MetadataResponse(grpc_metadata_array *recv_metadata): + recv_metadata(recv_metadata), OpResponse("metadata") { + } + + Handle GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ParseMetadata(recv_metadata)); + } + + private: + grpc_metadata_array *recv_metadata; +}; + +class MessageResponse : public OpResponse { + public: + explicit MessageResponse(grpc_byte_buffer **recv_message): + recv_message(recv_message), OpResponse("read") { + } + + Handle GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ByteBufferToBuffer(*recv_message)); + } + + private: + grpc_byte_buffer **recv_message +}; + +class ClientStatusResponse : public OpResponse { + public: + explicit ClientStatusResponse(grpc_metadata_array *metadata_array, + grpc_status_code *status, + char **status_details): + metadata_array(metadata_array), status(status), + status_details(status_details), OpResponse("status") { + } + + Handle GetNodeValue() const { + NanEscapableScope(); + Handle status_obj = NanNew(); + status_obj->Set(NanNew("code"), NanNew(*status)); + if (event->data.finished.details != NULL) { + status_obj->Set(NanNew("details"), String::New(*status_details)); + } + status_obj->Set(NanNew("metadata"), ParseMetadata(metadata_array)); + return NanEscapeScope(status_obj); + } + private: + grpc_metadata_array *metadata_array; + grpc_status_code *status; + char **status_details; +}; + +class ServerCloseResponse : public OpResponse { + public: + explicit ServerCloseResponse(int *cancelled): cancelled(cancelled), + OpResponse("cancelled") { + } + + Handle GetNodeValue() const { + NanEscapableScope(); + NanEscapeScope(NanNew(*cancelled)); + } + + private: + int *cancelled; +}; + +class NewCallResponse : public OpResponse { + public: + explicit NewCallResponse(grpc_call **call, grpc_call_details *details, + grpc_metadata_array *request_metadata) : + call(call), details(details), request_metadata(request_metadata), + OpResponse("call"){ + } + + Handle GetNodeValue() const { + NanEscapableScope(); + if (*call == NULL) { + return NanEscapeScope(NanNull()); + } + Handle obj = NanNew(); + obj->Set(NanNew("call"), Call::WrapStruct(*call)); + obj->Set(NanNew("method"), NanNew(details->method)); + obj->Set(NanNew("host"), NanNew(details->host)); + obj->Set(NanNew("deadline"), + NanNew(TimespecToMilliseconds(details->deadline))); + obj->Set(NanNew("metadata"), ParseMetadata(request_metadata)); + return NanEscapeScope(obj); + } + private: + grpc_call **call; + grpc_call_details *details; + grpc_metadata_array *request_metadata; +} +struct tag { + tag(NanCallback *callback, std::vector *responses) : + callback(callback), repsonses(responses) { + } ~tag() { - persist_tag->Dispose(); - if (persist_call != NULL) { - persist_call->Dispose(); + for (std::vector::iterator it = responses->begin(); + it != responses->end(); ++it) { + delete *it; } + delete callback; + delete responses; } - Persistent *persist_tag; - Persistent *persist_call; + NanCallback *callback; + std::vector *responses; }; -void *CreateTag(Handle tag, Handle call) { +void *CreateTag(Handle callback, grpc_op *ops, size_t nops) { NanScope(); - Persistent *persist_tag = new Persistent(); - NanAssignPersistent(*persist_tag, tag); - Persistent *persist_call; - if (call->IsNull() || call->IsUndefined()) { - persist_call = NULL; - } else { - persist_call = new Persistent(); - NanAssignPersistent(*persist_call, call); - } - struct tag *tag_struct = new struct tag(persist_tag, persist_call); + NanCallback *cb = new NanCallback(callback); + vector *responses = new vector(); + for (size_t i = 0; i < nops; i++) { + grpc_op *op = &ops[i]; + OpResponse *resp; + // Switching on the TYPE of the op + switch (op->op) { + case GRPC_OP_SEND_INITIAL_METADATA: + resp = new SendResponse("send metadata"); + break; + case GRPC_OP_SEND_MESSAGE: + resp = new SendResponse("write"); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + resp = new SendResponse("client close"); + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + resp = new SendResponse("server close"); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + resp = new MetadataResponse(op->data.recv_initial_metadata); + break; + case GRPC_OP_RECV_MESSAGE: + resp = new MessageResponse(op->data.recv_message); + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + resp = new ClientStatusResponse( + op->data.recv_status_on_client.trailing_metadata, + op->data.recv_status_on_client.status, + op->data.recv_status_on_client.status_details); + break; + case GRPC_RECV_CLOSE_ON_SERVER: + resp = new ServerCloseResponse(op->data.recv_close_on_server.cancelled); + break; + default: + continue; + } + responses->push_back(resp); + } + struct tag *tag_struct = new struct tag(cb, responses); return reinterpret_cast(tag_struct); } -Handle GetTagHandle(void *tag) { +void *CreateTag(Handle callback, grpc_call **call, + grpc_call_details *details, + grpc_metadata_array *request_metadata) { NanEscapableScope(); - struct tag *tag_struct = reinterpret_cast(tag); - Handle tag_value = NanNew(*tag_struct->persist_tag); - return NanEscapeScope(tag_value); + NanCallback *cb = new NanCallback(callback); + vector *responses = new vector(); + OpResponse *resp = new NewCallResponse(call, details, request_metadata); + responses->push_back(resp); + struct tag *tag_struct = new struct tag(cb, responses); + return reinterpret_cast(tag_struct); } -bool TagHasCall(void *tag) { +NanCallback GetCallback(void *tag) { + NanEscapableScope(); struct tag *tag_struct = reinterpret_cast(tag); - return tag_struct->persist_call != NULL; + return NanEscapeScope(*tag_struct->callback); } -Handle TagGetCall(void *tag) { +Handle GetNodeValue(void *tag) { NanEscapableScope(); struct tag *tag_struct = reinterpret_cast(tag); - if (tag_struct->persist_call == NULL) { - return NanEscapeScope(NanNull()); + Handle obj = NanNew(); + for (std::vector::iterator it = tag_struct->responses->begin(); + it != tag_struct->responses->end(); ++it) { + OpResponse *resp = *it; + obj->Set(resp->GetOpType(), resp->GetNodeValue()); } - Handle call_value = NanNew(*tag_struct->persist_call); - return NanEscapeScope(call_value); + return NanEscapeScope(obj); } void DestroyTag(void *tag) { delete reinterpret_cast(tag); } diff --git a/src/node/ext/tag.h b/src/node/ext/tag.h index bdb09252d98..5c709743237 100644 --- a/src/node/ext/tag.h +++ b/src/node/ext/tag.h @@ -34,21 +34,29 @@ #ifndef NET_GRPC_NODE_TAG_H_ #define NET_GRPC_NODE_TAG_H_ +#include #include +#include namespace grpc { namespace node { -/* Create a void* tag that can be passed to various grpc_call functions from - a javascript value and the javascript wrapper for the call. The call can be - null. */ -void *CreateTag(v8::Handle tag, v8::Handle call); -/* Return the javascript value stored in the tag */ -v8::Handle GetTagHandle(void *tag); -/* Returns true if the call was set (non-null) when the tag was created */ -bool TagHasCall(void *tag); -/* Returns the javascript wrapper for the call associated with this tag */ -v8::Handle TagGetCall(void *call); +/* Create a void* tag that can be passed to grpc_call_start_batch from a callback + function and an ops array */ +void *CreateTag(v8::Handle callback, grpc_op *ops, size_t nops); + +/* Create a void* tag that can be passed to grpc_server_request_call from a + callback and the various out parameters to that function */ +void *CreateTag(v8::Handle callback, grpc_call **call, + grpc_call_details *details, + grpc_metadata_array *request_metadata); + +/* Get the callback from the tag */ +NanCallback GetCallback(void *tag); + +/* Get the combined output value from the tag */ +v8::Handle GetNodevalue(void *tag); + /* Destroy the tag and all resources it is holding. It is illegal to call any of these other functions on a tag after it has been destroyed. */ void DestroyTag(void *tag);