|
|
|
@ -31,24 +31,26 @@ |
|
|
|
|
* |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
#include <memory> |
|
|
|
|
#include <vector> |
|
|
|
|
|
|
|
|
|
#include <node.h> |
|
|
|
|
|
|
|
|
|
#include "grpc/support/log.h" |
|
|
|
|
#include "grpc/grpc.h" |
|
|
|
|
#include "grpc/support/alloc.h" |
|
|
|
|
#include "grpc/support/time.h" |
|
|
|
|
#include "byte_buffer.h" |
|
|
|
|
#include "call.h" |
|
|
|
|
#include "channel.h" |
|
|
|
|
#include "completion_queue_async_worker.h" |
|
|
|
|
#include "timeval.h" |
|
|
|
|
#include "tag.h" |
|
|
|
|
|
|
|
|
|
namespace grpc { |
|
|
|
|
namespace node { |
|
|
|
|
|
|
|
|
|
using ::node::Buffer; |
|
|
|
|
using std::unique_ptr; |
|
|
|
|
using v8::Arguments; |
|
|
|
|
using v8::Array; |
|
|
|
|
using v8::Exception; |
|
|
|
@ -70,9 +72,11 @@ using v8::Value; |
|
|
|
|
Persistent<Function> Call::constructor; |
|
|
|
|
Persistent<FunctionTemplate> Call::fun_tpl; |
|
|
|
|
|
|
|
|
|
bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array |
|
|
|
|
vector<NanUtf8String*> *string_handles, |
|
|
|
|
vector<Persistent<Value>*> *handles) { |
|
|
|
|
|
|
|
|
|
bool CreateMetadataArray( |
|
|
|
|
Handle<Object> metadata, grpc_metadata_array *array, |
|
|
|
|
std::vector<unique_ptr<NanUtf8String> > *string_handles, |
|
|
|
|
std::vector<unique_ptr<PersistentHolder> > *handles) { |
|
|
|
|
NanScope(); |
|
|
|
|
Handle<Array> keys(metadata->GetOwnPropertyNames()); |
|
|
|
|
for (unsigned int i = 0; i < keys->Length(); i++) { |
|
|
|
@ -82,27 +86,27 @@ bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array |
|
|
|
|
} |
|
|
|
|
array->capacity += Local<Array>::Cast(metadata->Get(current_key))->Length(); |
|
|
|
|
} |
|
|
|
|
array->metadata = calloc(array->capacity, sizeof(grpc_metadata)); |
|
|
|
|
array->metadata = reinterpret_cast<grpc_metadata*>( |
|
|
|
|
gpr_malloc(array->capacity * sizeof(grpc_metadata))); |
|
|
|
|
for (unsigned int i = 0; i < keys->Length(); i++) { |
|
|
|
|
Handle<String> current_key(keys->Get(i)->ToString()); |
|
|
|
|
NanUtf8String *utf8_key = new NanUtf8String(current_key); |
|
|
|
|
string_handles->push_back(utf8_key); |
|
|
|
|
string_handles->push_back(unique_ptr<NanUtf8String(utf8_key)); |
|
|
|
|
Handle<Array> values = Local<Array>::Cast(metadata->Get(current_key)); |
|
|
|
|
for (unsigned int j = 0; j < values->Length(); j++) { |
|
|
|
|
Handle<Value> value = values->Get(j); |
|
|
|
|
grpc_metadata *current = &array[array->count]; |
|
|
|
|
grpc_call_error error; |
|
|
|
|
grpc_metadata *current = &array->metadata[array->count]; |
|
|
|
|
current->key = **utf8_key; |
|
|
|
|
if (Buffer::HasInstance(value)) { |
|
|
|
|
current->value = Buffer::Data(value); |
|
|
|
|
current->value_length = Buffer::Length(value); |
|
|
|
|
Persistent<Value> *handle = new Persistent<Value>(); |
|
|
|
|
NanAssignPersistent(handle, object); |
|
|
|
|
handles->push_back(handle); |
|
|
|
|
Persistent<Value> handle; |
|
|
|
|
NanAssignPersistent(handle, value); |
|
|
|
|
handles->push_back(PersistentHolder(handle)); |
|
|
|
|
} else if (value->IsString()) { |
|
|
|
|
Handle<String> string_value = value->ToString(); |
|
|
|
|
NanUtf8String *utf8_value = new NanUtf8String(string_value); |
|
|
|
|
string_handles->push_back(utf8_value); |
|
|
|
|
string_handles->push_back(unique_ptr<NanUtf8String>(utf8_value)); |
|
|
|
|
current->value = **utf8_value; |
|
|
|
|
current->value_length = string_value->Length(); |
|
|
|
|
} else { |
|
|
|
@ -114,6 +118,294 @@ bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Handle<Value> ParseMetadata(grpc_metadata_array *metadata_array) { |
|
|
|
|
NanEscapableScope(); |
|
|
|
|
grpc_metadata *metadata_elements = metadata_array->metadata; |
|
|
|
|
size_t length = metadata_array->count; |
|
|
|
|
std::map<char*, size_t> size_map; |
|
|
|
|
std::map<char*, size_t> index_map; |
|
|
|
|
|
|
|
|
|
for (unsigned int i = 0; i < length; i++) { |
|
|
|
|
char *key = metadata_elements[i].key; |
|
|
|
|
if (size_map.count(key)) { |
|
|
|
|
size_map[key] += 1; |
|
|
|
|
} |
|
|
|
|
index_map[key] = 0; |
|
|
|
|
} |
|
|
|
|
Handle<Object> metadata_object = NanNew<Object>(); |
|
|
|
|
for (unsigned int i = 0; i < length; i++) { |
|
|
|
|
grpc_metadata* elem = &metadata_elements[i]; |
|
|
|
|
Handle<String> key_string = String::New(elem->key); |
|
|
|
|
Handle<Array> array; |
|
|
|
|
if (metadata_object->Has(key_string)) { |
|
|
|
|
array = Handle<Array>::Cast(metadata_object->Get(key_string)); |
|
|
|
|
} else { |
|
|
|
|
array = NanNew<Array>(size_map[elem->key]); |
|
|
|
|
metadata_object->Set(key_string, array); |
|
|
|
|
} |
|
|
|
|
array->Set(index_map[elem->key], |
|
|
|
|
MakeFastBuffer( |
|
|
|
|
NanNewBufferHandle(elem->value, elem->value_length))); |
|
|
|
|
index_map[elem->key] += 1; |
|
|
|
|
} |
|
|
|
|
return NanEscapeScope(metadata_object); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class Op { |
|
|
|
|
public: |
|
|
|
|
Handle<Value> GetOpType() const { |
|
|
|
|
NanEscapableScope(); |
|
|
|
|
return NanEscapeScope(NanNew(GetTypeString())); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class SendMetadataOp : public Op { |
|
|
|
|
public: |
|
|
|
|
Handle<Value> GetNodeValue() { |
|
|
|
|
NanEscapableScope(); |
|
|
|
|
return NanEscapeScope(NanTrue()); |
|
|
|
|
} |
|
|
|
|
bool ParseOp(Handle<Value> value, grpc_op *out, |
|
|
|
|
std::vector<unique_ptr<NanUtf8String> > *strings, |
|
|
|
|
std::vector<unique_ptr<PersistentHolder> > *handles) { |
|
|
|
|
if (!value->IsObject()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
grpc_metadata_array array; |
|
|
|
|
if (!CreateMetadataArray(value->ToObject(), &array, strings, handles)) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
out->data.send_initial_metadata.count = array.count; |
|
|
|
|
out->data.send_initial_metadata.metadata = array.metadata; |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
protected: |
|
|
|
|
char *GetTypeString() { |
|
|
|
|
return "send metadata"; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class SendMessageOp : public Op { |
|
|
|
|
public: |
|
|
|
|
Handle<Value> GetNodeValue() { |
|
|
|
|
NanEscapableScope(); |
|
|
|
|
return NanEscapeScope(NanTrue()); |
|
|
|
|
} |
|
|
|
|
bool ParseOp(Handle<Value> value, grpc_op *out, |
|
|
|
|
std::vector<unique_ptr<NanUtf8String> > *strings, |
|
|
|
|
std::vector<unique_ptr<PersistentHolder> > *handles) { |
|
|
|
|
if (!Buffer::HasInstance(value)) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
out->.data.send_message = BufferToByteBuffer(obj->Get(type)); |
|
|
|
|
NanAssignPersistent(handle, value); |
|
|
|
|
handles->push_back(PersistentHolder(handle)); |
|
|
|
|
} |
|
|
|
|
protected: |
|
|
|
|
char *GetTypeString() { |
|
|
|
|
return "send message"; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class SendClientCloseOp : public Op { |
|
|
|
|
public: |
|
|
|
|
Handle<Value> GetNodeValue() { |
|
|
|
|
NanEscapableScope(); |
|
|
|
|
return NanEscapeScope(NanTrue()); |
|
|
|
|
} |
|
|
|
|
bool ParseOp(Handle<Value> value, grpc_op *out, |
|
|
|
|
std::vector<unique_ptr<NanUtf8String> > *strings, |
|
|
|
|
std::vector<unique_ptr<PersistentHolder> > *handles) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
protected: |
|
|
|
|
char *GetTypeString() { |
|
|
|
|
return "client close"; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class SendServerStatusOp : public Op { |
|
|
|
|
public: |
|
|
|
|
Handle<Value> GetNodeValue() { |
|
|
|
|
NanEscapableScope(); |
|
|
|
|
return NanEscapeScope(NanTrue()); |
|
|
|
|
} |
|
|
|
|
bool ParseOp(Handle<Value> value, grpc_op *out, |
|
|
|
|
std::vector<unique_ptr<NanUtf8String> > *strings, |
|
|
|
|
std::vector<unique_ptr<PersistentHolder> > *handles) { |
|
|
|
|
if (value->IsObject()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
Handle<Object> server_status = value->ToObject(); |
|
|
|
|
if (!server_status->Get(NanNew("metadata"))->IsObject()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
if (!server_status->Get(NanNew("code"))->IsUint32()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
if (!server_status->Get(NanNew("details"))->IsString()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
grpc_metadata_array array; |
|
|
|
|
if (!CreateMetadataArray(server_status->Get(NanNew("metadata"))-> |
|
|
|
|
ToObject(), |
|
|
|
|
&array, strings, handles)) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
out->data.send_status_from_server.trailing_metadata_count = array.count; |
|
|
|
|
out->data.send_status_from_server.trailing_metadata = array.metadata; |
|
|
|
|
out->data.send_status_from_server.status = |
|
|
|
|
static_cast<grpc_status_code>( |
|
|
|
|
server_status->Get(NanNew("code"))->Uint32Value()); |
|
|
|
|
NanUtf8String *str = new NanUtf8String( |
|
|
|
|
server_status->Get(NanNew("details"))); |
|
|
|
|
strings->push_back(unique_ptr<NanUtf8String>(str)); |
|
|
|
|
out->data.send_status_from_server.status_details = **str; |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
protected: |
|
|
|
|
char *GetTypeString() { |
|
|
|
|
return "send status"; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class GetMetadataOp : public Op { |
|
|
|
|
public: |
|
|
|
|
GetMetadataOp() { |
|
|
|
|
grpc_metadata_array_init(&recv_metadata); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
~GetMetadataOp() { |
|
|
|
|
grpc_metadata_array_destroy(&recv_metadata); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Handle<Value> GetNodeValue() const { |
|
|
|
|
NanEscapableScope(); |
|
|
|
|
return NanEscapeScope(ParseMetadata(&recv_metadata)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool ParseOp(Handle<Value> value, grpc_op *out, |
|
|
|
|
std::vector<unique_ptr<NanUtf8String> > *strings, |
|
|
|
|
std::vector<unique_ptr<PersistentHolder> > *handles) { |
|
|
|
|
out->data.recv_initial_metadata = &recv_metadata; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
protected: |
|
|
|
|
char *GetTypeString() { |
|
|
|
|
return "metadata"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
grpc_metadata_array recv_metadata; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class ReadMessageOp : public Op { |
|
|
|
|
public: |
|
|
|
|
ReadMessageOp() { |
|
|
|
|
recv_message = NULL; |
|
|
|
|
} |
|
|
|
|
~ReadMessageOp() { |
|
|
|
|
if (recv_message != NULL) { |
|
|
|
|
gpr_free(recv_message); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Handle<Value> GetNodeValue() const { |
|
|
|
|
NanEscapableScope(); |
|
|
|
|
return NanEscapeScope(ByteBufferToBuffer(*recv_message)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool ParseOp(Handle<Value> value, grpc_op *out, |
|
|
|
|
std::vector<unique_ptr<NanUtf8String> > *strings, |
|
|
|
|
std::vector<unique_ptr<PersistentHolder> > *handles) { |
|
|
|
|
out->data.recv_message = &recv_message; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
protected: |
|
|
|
|
char *GetTypeString() { |
|
|
|
|
return "read"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
grpc_byte_buffer *recv_message; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class ClientStatusOp : public Op { |
|
|
|
|
public: |
|
|
|
|
ClientStatusOp() { |
|
|
|
|
grpc_metadata_array_init(&metadata); |
|
|
|
|
status_details = NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
~ClientStatusOp() { |
|
|
|
|
gprc_metadata_array_destroy(&metadata_array); |
|
|
|
|
gpr_free(status_details); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool ParseOp(Handle<Value> value, grpc_op *out, |
|
|
|
|
std::vector<unique_ptr<NanUtf8String> > *strings, |
|
|
|
|
std::vector<unique_ptr<PersistentHolder> > *handles) { |
|
|
|
|
out->data.recv_status_on_client.trailing_metadata = &metadata_array; |
|
|
|
|
out->data.recv_status_on_client.status = &status; |
|
|
|
|
out->data.recv_status_on_client.status_details = &status_details; |
|
|
|
|
out->data.recv_status_on_client.status_details_capacity = &details_capacity; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Handle<Value> GetNodeValue() const { |
|
|
|
|
NanEscapableScope(); |
|
|
|
|
Handle<Object> status_obj = NanNew<Object>(); |
|
|
|
|
status_obj->Set(NanNew("code"), NanNew<Number>(status)); |
|
|
|
|
if (event->data.finished.details != NULL) { |
|
|
|
|
status_obj->Set(NanNew("details"), String::New(status_details)); |
|
|
|
|
} |
|
|
|
|
status_obj->Set(NanNew("metadata"), ParseMetadata(&metadata_array)); |
|
|
|
|
return NanEscapeScope(status_obj); |
|
|
|
|
} |
|
|
|
|
private: |
|
|
|
|
grpc_metadata_array metadata_array; |
|
|
|
|
grpc_status_code status; |
|
|
|
|
char *status_details; |
|
|
|
|
size_t details_capacity; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class ServerCloseResponseOp : public Op { |
|
|
|
|
public: |
|
|
|
|
Handle<Value> GetNodeValue() const { |
|
|
|
|
NanEscapableScope(); |
|
|
|
|
NanEscapeScope(NanNew<Boolean>(cancelled)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool ParseOp(Handle<Value> value, grpc_op *out, |
|
|
|
|
std::vector<unique_ptr<NanUtf8String> > *strings, |
|
|
|
|
std::vector<unique_ptr<PersistentHolder> > *handles) { |
|
|
|
|
out->data.recv_close_on_server.cancelled = &cancelled; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
int cancelled; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
struct tag { |
|
|
|
|
tag(NanCallback *callback, std::vector<unique_ptr<Op> > *ops, |
|
|
|
|
std::vector<unique_ptr<PersistentHolder> > *handles, |
|
|
|
|
std::vector<unique_ptr<NanUtf8String> > *strings) : |
|
|
|
|
callback(callback), ops(ops), handles(handles), strings(strings){ |
|
|
|
|
} |
|
|
|
|
~tag() { |
|
|
|
|
if (strings != null) { |
|
|
|
|
for (std::vector<NanUtf8String *>::iterator it = strings.begin(); |
|
|
|
|
it != strings.end(); ++it) { |
|
|
|
|
delete *it; |
|
|
|
|
} |
|
|
|
|
delete strings; |
|
|
|
|
} |
|
|
|
|
delete callback; |
|
|
|
|
delete ops; |
|
|
|
|
if (handles != null) { |
|
|
|
|
delete handles; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
Call::Call(grpc_call *call) : wrapped_call(call) {} |
|
|
|
|
|
|
|
|
|
Call::~Call() { grpc_call_destroy(wrapped_call); } |
|
|
|
@ -123,28 +415,10 @@ void Call::Init(Handle<Object> exports) { |
|
|
|
|
Local<FunctionTemplate> tpl = FunctionTemplate::New(New); |
|
|
|
|
tpl->SetClassName(NanNew("Call")); |
|
|
|
|
tpl->InstanceTemplate()->SetInternalFieldCount(1); |
|
|
|
|
NanSetPrototypeTemplate(tpl, "addMetadata", |
|
|
|
|
FunctionTemplate::New(AddMetadata)->GetFunction()); |
|
|
|
|
NanSetPrototypeTemplate(tpl, "invoke", |
|
|
|
|
FunctionTemplate::New(Invoke)->GetFunction()); |
|
|
|
|
NanSetPrototypeTemplate(tpl, "serverAccept", |
|
|
|
|
FunctionTemplate::New(ServerAccept)->GetFunction()); |
|
|
|
|
NanSetPrototypeTemplate( |
|
|
|
|
tpl, "serverEndInitialMetadata", |
|
|
|
|
FunctionTemplate::New(ServerEndInitialMetadata)->GetFunction()); |
|
|
|
|
NanSetPrototypeTemplate(tpl, "startBatch", |
|
|
|
|
FunctionTemplate::New(StartBatch)->GetFunction()); |
|
|
|
|
NanSetPrototypeTemplate(tpl, "cancel", |
|
|
|
|
FunctionTemplate::New(Cancel)->GetFunction()); |
|
|
|
|
NanSetPrototypeTemplate(tpl, "startWrite", |
|
|
|
|
FunctionTemplate::New(StartWrite)->GetFunction()); |
|
|
|
|
NanSetPrototypeTemplate( |
|
|
|
|
tpl, "startWriteStatus", |
|
|
|
|
FunctionTemplate::New(StartWriteStatus)->GetFunction()); |
|
|
|
|
NanSetPrototypeTemplate(tpl, "writesDone", |
|
|
|
|
FunctionTemplate::New(WritesDone)->GetFunction()); |
|
|
|
|
NanSetPrototypeTemplate(tpl, "startReadMetadata", |
|
|
|
|
FunctionTemplate::New(WritesDone)->GetFunction()); |
|
|
|
|
NanSetPrototypeTemplate(tpl, "startRead", |
|
|
|
|
FunctionTemplate::New(StartRead)->GetFunction()); |
|
|
|
|
NanAssignPersistent(fun_tpl, tpl); |
|
|
|
|
NanAssignPersistent(constructor, tpl->GetFunction()); |
|
|
|
|
constructor->Set(NanNew("WRITE_BUFFER_HINT"), |
|
|
|
@ -225,211 +499,64 @@ NAN_METHOD(Call::StartBatch) { |
|
|
|
|
if (!args[1]->IsFunction()) { |
|
|
|
|
return NanThrowError("startBatch's second argument must be a callback"); |
|
|
|
|
} |
|
|
|
|
vector<Persistent<Value> *> *handles = new vector<Persistent<Value>>(); |
|
|
|
|
vector<NanUtf8String *> *strings = new vector<NanUtf8String *>(); |
|
|
|
|
Persistent<Value> *handle; |
|
|
|
|
Handle<Array> keys = args[0]->GetOwnPropertyNames(); |
|
|
|
|
Call *call = ObjectWrap::Unwrap<Call>(args.This()); |
|
|
|
|
std::vector<unique_ptr<PersistentHolder> > *handles = |
|
|
|
|
new std::vector<unique_ptr<PersistentHolder> >(); |
|
|
|
|
std::vector<unique_ptr<NanUtf8String> > *strings = |
|
|
|
|
new std::vector<unique_ptr<NanUtf8String> >(); |
|
|
|
|
Persistent<Value> handle; |
|
|
|
|
Handle<Object> obj = args[0]->ToObject(); |
|
|
|
|
Handle<Array> keys = obj->GetOwnPropertyNames(); |
|
|
|
|
size_t nops = keys->Length(); |
|
|
|
|
grpc_op *ops = calloc(nops, sizeof(grpc_op)); |
|
|
|
|
grpc_metadata_array array; |
|
|
|
|
Handle<Object> server_status; |
|
|
|
|
NanUtf8String *str; |
|
|
|
|
grpc_op *ops = new grpc_op[nops]; |
|
|
|
|
std::vector<unique_ptr<Op> > *op_vector = new std::vector<unique_ptr<Op> >(); |
|
|
|
|
for (unsigned int i = 0; i < nops; i++) { |
|
|
|
|
if (!keys->Get(i)->IsUInt32()) { |
|
|
|
|
Op *op; |
|
|
|
|
if (!keys->Get(i)->IsUint32()) { |
|
|
|
|
return NanThrowError( |
|
|
|
|
"startBatch's first argument's keys must be integers"); |
|
|
|
|
} |
|
|
|
|
uint32_t type = keys->Get(i)->UInt32Value(); |
|
|
|
|
ops[i].op = type; |
|
|
|
|
uint32_t type = keys->Get(i)->Uint32Value(); |
|
|
|
|
ops[i].op = static_cast<grpc_op_type>(type); |
|
|
|
|
switch (type) { |
|
|
|
|
case GRPC_OP_SEND_INITIAL_METADATA: |
|
|
|
|
if (!args[0]->Get(type)->IsObject()) { |
|
|
|
|
return NanThrowError("metadata must be an object"); |
|
|
|
|
} |
|
|
|
|
if (!CreateMetadataArray(args[0]->Get(type)->ToObject(), &array, |
|
|
|
|
strings, handles)) { |
|
|
|
|
return NanThrowError("failed to parse metadata"); |
|
|
|
|
} |
|
|
|
|
ops[i].data.send_initial_metadata.count = array.count; |
|
|
|
|
ops[i].data.send_initial_metadata.metadata = array.metadata; |
|
|
|
|
break |
|
|
|
|
op = new SendMetadataOp(); |
|
|
|
|
break; |
|
|
|
|
case GRPC_OP_SEND_MESSAGE: |
|
|
|
|
if (!Buffer::HasInstance(args[0]->Get(type))) { |
|
|
|
|
return NanThrowError("message must be a Buffer"); |
|
|
|
|
} |
|
|
|
|
ops[i].data.send_message = BufferToByteBuffer(args[0]->Get(type)); |
|
|
|
|
handle = new Persistent<Value>(); |
|
|
|
|
NanAssignPersistent(*handle, args[0]->Get(type)); |
|
|
|
|
handles->push_back(handle); |
|
|
|
|
op = new SendMessageOp(); |
|
|
|
|
break; |
|
|
|
|
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: |
|
|
|
|
op = new SendClientCloseOp(); |
|
|
|
|
break; |
|
|
|
|
case GRPC_OP_SEND_STATUS_FROM_SERVER: |
|
|
|
|
if (!args[0]->Get(type)->IsObject()) { |
|
|
|
|
return NanThrowError("server status must be an object"); |
|
|
|
|
} |
|
|
|
|
server_status = args[0]->Get(type)->ToObject(); |
|
|
|
|
if (!server_status->Get("metadata")->IsObject()) { |
|
|
|
|
return NanThrowError("status metadata must be an object"); |
|
|
|
|
} |
|
|
|
|
if (!server_status->Get("code")->IsUInt32()) { |
|
|
|
|
return NanThrowError("status code must be a positive integer"); |
|
|
|
|
} |
|
|
|
|
if (!server_status->Get("details")->IsString()) { |
|
|
|
|
return NanThrowError("status details must be a string"); |
|
|
|
|
} |
|
|
|
|
if (!CreateMetadataArray(server_status->Get("metadata")->ToObject(), |
|
|
|
|
&array, strings, handles)) { |
|
|
|
|
return NanThrowError("Failed to parse status metadata"); |
|
|
|
|
} |
|
|
|
|
ops[i].data.send_status_from_server.trailing_metadata_count = |
|
|
|
|
array.count; |
|
|
|
|
ops[i].data.send_status_from_server.trailing_metadata = array.metadata; |
|
|
|
|
ops[i].data.send_status_from_server.status = |
|
|
|
|
server_status->Get("code")->UInt32Value(); |
|
|
|
|
str = new NanUtf8String(server_status->Get("details")); |
|
|
|
|
strings->push_back(str); |
|
|
|
|
ops[i].data.send_status_from_server.status_details = **str; |
|
|
|
|
op = new SendServerStatusOp(); |
|
|
|
|
break; |
|
|
|
|
case GRPC_OP_RECV_INITIAL_METADATA: |
|
|
|
|
ops[i].data.recv_initial_metadata = malloc(sizeof(grpc_metadata_array)); |
|
|
|
|
grpc_metadata_array_init(ops[i].data.recv_initial_metadata); |
|
|
|
|
op = new GetMetadataOp(); |
|
|
|
|
break; |
|
|
|
|
case GRPC_OP_RECV_MESSAGE: |
|
|
|
|
ops[i].data.recv_message = malloc(sizeof(grpc_byte_buffer*)); |
|
|
|
|
op = new ReadMessageOp(); |
|
|
|
|
break; |
|
|
|
|
case GRPC_OP_RECV_STATUS_ON_CLIENT: |
|
|
|
|
ops[i].data.recv_status_on_client.trailing_metadata = |
|
|
|
|
malloc(sizeof(grpc_metadata_array)); |
|
|
|
|
grpc_metadata_array_init(ops[i].data.recv_status_on_client); |
|
|
|
|
ops[i].data.recv_status_on_client.status = |
|
|
|
|
malloc(sizeof(grpc_status_code)); |
|
|
|
|
ops[i].data.recv_status_on_client.status_details = |
|
|
|
|
malloc(sizeof(char *)); |
|
|
|
|
ops[i].data.recv_status_on_client.status_details_capacity = |
|
|
|
|
malloc(sizeof(size_t)); |
|
|
|
|
op = new ClientStatusOp(); |
|
|
|
|
break; |
|
|
|
|
case GRPC_OP_RECV_CLOSE_ON_SERVER: |
|
|
|
|
ops[i].data.recv_close_on_server = malloc(sizeof(int)); |
|
|
|
|
op = new ServerCloseResponseOp(); |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
default: |
|
|
|
|
return NanThrowError("Argument object had an unrecognized key"); |
|
|
|
|
} |
|
|
|
|
op.ParseOp(obj.get(type), &ops[i], strings, handles); |
|
|
|
|
op_vector.push_back(unique_ptr<Op>(op)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
NAN_METHOD(Call::AddMetadata) { |
|
|
|
|
NanScope(); |
|
|
|
|
if (!HasInstance(args.This())) { |
|
|
|
|
return NanThrowTypeError("addMetadata can only be called on Call objects"); |
|
|
|
|
} |
|
|
|
|
Call *call = ObjectWrap::Unwrap<Call>(args.This()); |
|
|
|
|
if (!args[0]->IsObject()) { |
|
|
|
|
return NanThrowTypeError("addMetadata's first argument must be an object"); |
|
|
|
|
} |
|
|
|
|
Handle<Object> metadata = args[0]->ToObject(); |
|
|
|
|
Handle<Array> keys(metadata->GetOwnPropertyNames()); |
|
|
|
|
for (unsigned int i = 0; i < keys->Length(); i++) { |
|
|
|
|
Handle<String> current_key(keys->Get(i)->ToString()); |
|
|
|
|
if (!metadata->Get(current_key)->IsArray()) { |
|
|
|
|
return NanThrowTypeError( |
|
|
|
|
"addMetadata's first argument's values must be arrays"); |
|
|
|
|
} |
|
|
|
|
NanUtf8String utf8_key(current_key); |
|
|
|
|
Handle<Array> values = Local<Array>::Cast(metadata->Get(current_key)); |
|
|
|
|
for (unsigned int j = 0; j < values->Length(); j++) { |
|
|
|
|
Handle<Value> value = values->Get(j); |
|
|
|
|
grpc_metadata metadata; |
|
|
|
|
grpc_call_error error; |
|
|
|
|
metadata.key = *utf8_key; |
|
|
|
|
if (Buffer::HasInstance(value)) { |
|
|
|
|
metadata.value = Buffer::Data(value); |
|
|
|
|
metadata.value_length = Buffer::Length(value); |
|
|
|
|
error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0); |
|
|
|
|
} else if (value->IsString()) { |
|
|
|
|
Handle<String> string_value = value->ToString(); |
|
|
|
|
NanUtf8String utf8_value(string_value); |
|
|
|
|
metadata.value = *utf8_value; |
|
|
|
|
metadata.value_length = string_value->Length(); |
|
|
|
|
gpr_log(GPR_DEBUG, "adding metadata: %s, %s, %d", metadata.key, |
|
|
|
|
metadata.value, metadata.value_length); |
|
|
|
|
error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0); |
|
|
|
|
} else { |
|
|
|
|
return NanThrowTypeError( |
|
|
|
|
"addMetadata values must be strings or buffers"); |
|
|
|
|
} |
|
|
|
|
if (error != GRPC_CALL_OK) { |
|
|
|
|
return NanThrowError("addMetadata failed", error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
NanReturnUndefined(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
NAN_METHOD(Call::Invoke) { |
|
|
|
|
NanScope(); |
|
|
|
|
if (!HasInstance(args.This())) { |
|
|
|
|
return NanThrowTypeError("invoke can only be called on Call objects"); |
|
|
|
|
} |
|
|
|
|
if (!args[0]->IsFunction()) { |
|
|
|
|
return NanThrowTypeError("invoke's first argument must be a function"); |
|
|
|
|
} |
|
|
|
|
if (!args[1]->IsFunction()) { |
|
|
|
|
return NanThrowTypeError("invoke's second argument must be a function"); |
|
|
|
|
} |
|
|
|
|
if (!args[2]->IsUint32()) { |
|
|
|
|
return NanThrowTypeError("invoke's third argument must be integer flags"); |
|
|
|
|
} |
|
|
|
|
Call *call = ObjectWrap::Unwrap<Call>(args.This()); |
|
|
|
|
unsigned int flags = args[3]->Uint32Value(); |
|
|
|
|
grpc_call_error error = grpc_call_invoke_old( |
|
|
|
|
call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), |
|
|
|
|
CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags); |
|
|
|
|
if (error == GRPC_CALL_OK) { |
|
|
|
|
CompletionQueueAsyncWorker::Next(); |
|
|
|
|
CompletionQueueAsyncWorker::Next(); |
|
|
|
|
} else { |
|
|
|
|
return NanThrowError("invoke failed", error); |
|
|
|
|
} |
|
|
|
|
NanReturnUndefined(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
NAN_METHOD(Call::ServerAccept) { |
|
|
|
|
NanScope(); |
|
|
|
|
if (!HasInstance(args.This())) { |
|
|
|
|
return NanThrowTypeError("accept can only be called on Call objects"); |
|
|
|
|
} |
|
|
|
|
if (!args[0]->IsFunction()) { |
|
|
|
|
return NanThrowTypeError("accept's first argument must be a function"); |
|
|
|
|
} |
|
|
|
|
Call *call = ObjectWrap::Unwrap<Call>(args.This()); |
|
|
|
|
grpc_call_error error = grpc_call_server_accept_old( |
|
|
|
|
call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), |
|
|
|
|
CreateTag(args[0], args.This())); |
|
|
|
|
if (error == GRPC_CALL_OK) { |
|
|
|
|
CompletionQueueAsyncWorker::Next(); |
|
|
|
|
} else { |
|
|
|
|
return NanThrowError("serverAccept failed", error); |
|
|
|
|
} |
|
|
|
|
NanReturnUndefined(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
NAN_METHOD(Call::ServerEndInitialMetadata) { |
|
|
|
|
NanScope(); |
|
|
|
|
if (!HasInstance(args.This())) { |
|
|
|
|
return NanThrowTypeError( |
|
|
|
|
"serverEndInitialMetadata can only be called on Call objects"); |
|
|
|
|
} |
|
|
|
|
if (!args[0]->IsUint32()) { |
|
|
|
|
return NanThrowTypeError( |
|
|
|
|
"serverEndInitialMetadata's second argument must be integer flags"); |
|
|
|
|
} |
|
|
|
|
Call *call = ObjectWrap::Unwrap<Call>(args.This()); |
|
|
|
|
unsigned int flags = args[1]->Uint32Value(); |
|
|
|
|
grpc_call_error error = |
|
|
|
|
grpc_call_server_end_initial_metadata_old(call->wrapped_call, flags); |
|
|
|
|
grpc_call_error error = grpc_call_start_batch( |
|
|
|
|
call->wrapped_call, ops, nops, new struct tag(args[1].As<Function>(), |
|
|
|
|
op_vector, nops, handles, |
|
|
|
|
strings)); |
|
|
|
|
if (error != GRPC_CALL_OK) { |
|
|
|
|
return NanThrowError("serverEndInitialMetadata failed", error); |
|
|
|
|
return NanThrowError("startBatch failed", error); |
|
|
|
|
} |
|
|
|
|
CompletionQueueAsyncWorker::Next(); |
|
|
|
|
NanReturnUndefined(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -446,102 +573,5 @@ NAN_METHOD(Call::Cancel) { |
|
|
|
|
NanReturnUndefined(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
NAN_METHOD(Call::StartWrite) { |
|
|
|
|
NanScope(); |
|
|
|
|
if (!HasInstance(args.This())) { |
|
|
|
|
return NanThrowTypeError("startWrite can only be called on Call objects"); |
|
|
|
|
} |
|
|
|
|
if (!Buffer::HasInstance(args[0])) { |
|
|
|
|
return NanThrowTypeError("startWrite's first argument must be a Buffer"); |
|
|
|
|
} |
|
|
|
|
if (!args[1]->IsFunction()) { |
|
|
|
|
return NanThrowTypeError("startWrite's second argument must be a function"); |
|
|
|
|
} |
|
|
|
|
if (!args[2]->IsUint32()) { |
|
|
|
|
return NanThrowTypeError( |
|
|
|
|
"startWrite's third argument must be integer flags"); |
|
|
|
|
} |
|
|
|
|
Call *call = ObjectWrap::Unwrap<Call>(args.This()); |
|
|
|
|
grpc_byte_buffer *buffer = BufferToByteBuffer(args[0]); |
|
|
|
|
unsigned int flags = args[2]->Uint32Value(); |
|
|
|
|
grpc_call_error error = grpc_call_start_write_old( |
|
|
|
|
call->wrapped_call, buffer, CreateTag(args[1], args.This()), flags); |
|
|
|
|
if (error == GRPC_CALL_OK) { |
|
|
|
|
CompletionQueueAsyncWorker::Next(); |
|
|
|
|
} else { |
|
|
|
|
return NanThrowError("startWrite failed", error); |
|
|
|
|
} |
|
|
|
|
NanReturnUndefined(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
NAN_METHOD(Call::StartWriteStatus) { |
|
|
|
|
NanScope(); |
|
|
|
|
if (!HasInstance(args.This())) { |
|
|
|
|
return NanThrowTypeError( |
|
|
|
|
"startWriteStatus can only be called on Call objects"); |
|
|
|
|
} |
|
|
|
|
if (!args[0]->IsUint32()) { |
|
|
|
|
return NanThrowTypeError( |
|
|
|
|
"startWriteStatus's first argument must be a status code"); |
|
|
|
|
} |
|
|
|
|
if (!args[1]->IsString()) { |
|
|
|
|
return NanThrowTypeError( |
|
|
|
|
"startWriteStatus's second argument must be a string"); |
|
|
|
|
} |
|
|
|
|
if (!args[2]->IsFunction()) { |
|
|
|
|
return NanThrowTypeError( |
|
|
|
|
"startWriteStatus's third argument must be a function"); |
|
|
|
|
} |
|
|
|
|
Call *call = ObjectWrap::Unwrap<Call>(args.This()); |
|
|
|
|
NanUtf8String details(args[1]); |
|
|
|
|
grpc_call_error error = grpc_call_start_write_status_old( |
|
|
|
|
call->wrapped_call, (grpc_status_code)args[0]->Uint32Value(), *details, |
|
|
|
|
CreateTag(args[2], args.This())); |
|
|
|
|
if (error == GRPC_CALL_OK) { |
|
|
|
|
CompletionQueueAsyncWorker::Next(); |
|
|
|
|
} else { |
|
|
|
|
return NanThrowError("startWriteStatus failed", error); |
|
|
|
|
} |
|
|
|
|
NanReturnUndefined(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
NAN_METHOD(Call::WritesDone) { |
|
|
|
|
NanScope(); |
|
|
|
|
if (!HasInstance(args.This())) { |
|
|
|
|
return NanThrowTypeError("writesDone can only be called on Call objects"); |
|
|
|
|
} |
|
|
|
|
if (!args[0]->IsFunction()) { |
|
|
|
|
return NanThrowTypeError("writesDone's first argument must be a function"); |
|
|
|
|
} |
|
|
|
|
Call *call = ObjectWrap::Unwrap<Call>(args.This()); |
|
|
|
|
grpc_call_error error = grpc_call_writes_done_old( |
|
|
|
|
call->wrapped_call, CreateTag(args[0], args.This())); |
|
|
|
|
if (error == GRPC_CALL_OK) { |
|
|
|
|
CompletionQueueAsyncWorker::Next(); |
|
|
|
|
} else { |
|
|
|
|
return NanThrowError("writesDone failed", error); |
|
|
|
|
} |
|
|
|
|
NanReturnUndefined(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
NAN_METHOD(Call::StartRead) { |
|
|
|
|
NanScope(); |
|
|
|
|
if (!HasInstance(args.This())) { |
|
|
|
|
return NanThrowTypeError("startRead can only be called on Call objects"); |
|
|
|
|
} |
|
|
|
|
if (!args[0]->IsFunction()) { |
|
|
|
|
return NanThrowTypeError("startRead's first argument must be a function"); |
|
|
|
|
} |
|
|
|
|
Call *call = ObjectWrap::Unwrap<Call>(args.This()); |
|
|
|
|
grpc_call_error error = grpc_call_start_read_old( |
|
|
|
|
call->wrapped_call, CreateTag(args[0], args.This())); |
|
|
|
|
if (error == GRPC_CALL_OK) { |
|
|
|
|
CompletionQueueAsyncWorker::Next(); |
|
|
|
|
} else { |
|
|
|
|
return NanThrowError("startRead failed", error); |
|
|
|
|
} |
|
|
|
|
NanReturnUndefined(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} // namespace node
|
|
|
|
|
} // namespace grpc
|
|
|
|
|