Clean commit of Node.js library source

pull/16/head
murgatroid99 10 years ago
parent 470a3ea1a1
commit e506151918
  1. 12
      src/node/README.md
  2. 46
      src/node/binding.gyp
  3. 46
      src/node/byte_buffer.cc
  4. 23
      src/node/byte_buffer.h
  5. 384
      src/node/call.cc
  6. 49
      src/node/call.h
  7. 155
      src/node/channel.cc
  8. 46
      src/node/channel.h
  9. 176
      src/node/client.js
  10. 29
      src/node/common.js
  11. 62
      src/node/completion_queue_async_worker.cc
  12. 46
      src/node/completion_queue_async_worker.h
  13. 180
      src/node/credentials.cc
  14. 48
      src/node/credentials.h
  15. 132
      src/node/event.cc
  16. 15
      src/node/event.h
  17. 25
      src/node/examples/math.proto
  18. 168
      src/node/examples/math_server.js
  19. 151
      src/node/node_grpc.cc
  20. 18
      src/node/package.json
  21. 19
      src/node/port_picker.js
  22. 212
      src/node/server.cc
  23. 46
      src/node/server.h
  24. 228
      src/node/server.js
  25. 131
      src/node/server_credentials.cc
  26. 44
      src/node/server_credentials.h
  27. 306
      src/node/surface_client.js
  28. 325
      src/node/surface_server.js
  29. 71
      src/node/tag.cc
  30. 26
      src/node/tag.h
  31. 35
      src/node/test/byte_buffer_test.js~
  32. 169
      src/node/test/call_test.js
  33. 6
      src/node/test/call_test.js~
  34. 55
      src/node/test/channel_test.js
  35. 150
      src/node/test/client_server_test.js
  36. 59
      src/node/test/client_server_test.js~
  37. 30
      src/node/test/completion_queue_test.js~
  38. 97
      src/node/test/constant_test.js
  39. 25
      src/node/test/constant_test.js~
  40. 1
      src/node/test/data/README
  41. 15
      src/node/test/data/ca.pem
  42. 16
      src/node/test/data/server1.key
  43. 16
      src/node/test/data/server1.pem
  44. 168
      src/node/test/end_to_end_test.js
  45. 72
      src/node/test/end_to_end_test.js~
  46. 176
      src/node/test/math_client_test.js
  47. 87
      src/node/test/math_client_test.js~
  48. 88
      src/node/test/server_test.js
  49. 22
      src/node/test/server_test.js~
  50. 33
      src/node/timeval.cc
  51. 15
      src/node/timeval.h

@ -0,0 +1,12 @@
# Node.js GRPC extension
The package is built with
node-gyp configure
node-gyp build
or, for brevity
node-gyp configure build
The tests can be run with `npm test` on a dev install.

@ -0,0 +1,46 @@
{
"targets" : [
{
'include_dirs': [
"<!(node -e \"require('nan')\")"
],
'cxxflags': [
'-Wall',
'-pthread',
'-pedantic',
'-g',
'-zdefs'
'-Werror',
],
'ldflags': [
'-g',
'-L/usr/local/google/home/mlumish/grpc_dev/lib'
],
'link_settings': {
'libraries': [
'-lgrpc',
'-levent',
'-levent_pthreads',
'-levent_core',
'-lrt',
'-lgpr',
'-lpthread'
],
},
"target_name": "grpc",
"sources": [
"byte_buffer.cc",
"call.cc",
"channel.cc",
"completion_queue_async_worker.cc",
"credentials.cc",
"event.cc",
"node_grpc.cc",
"server.cc",
"server_credentials.cc",
"tag.cc",
"timeval.cc"
]
}
]
}

@ -0,0 +1,46 @@
#include <string.h>
#include <malloc.h>
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "grpc/support/slice.h"
namespace grpc {
namespace node {
#include "byte_buffer.h"
using ::node::Buffer;
using v8::Handle;
using v8::Value;
grpc_byte_buffer *BufferToByteBuffer(Handle<Value> buffer) {
NanScope();
int length = Buffer::Length(buffer);
char *data = Buffer::Data(buffer);
gpr_slice slice = gpr_slice_malloc(length);
memcpy(GPR_SLICE_START_PTR(slice), data, length);
grpc_byte_buffer *byte_buffer(grpc_byte_buffer_create(&slice, 1));
gpr_slice_unref(slice);
return byte_buffer;
}
Handle<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) {
NanEscapableScope();
if (buffer == NULL) {
NanReturnNull();
}
size_t length = grpc_byte_buffer_length(buffer);
char *result = reinterpret_cast<char*>(calloc(length, sizeof(char)));
size_t offset = 0;
grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer);
gpr_slice next;
while (grpc_byte_buffer_reader_next(reader, &next) != 0) {
memcpy(result+offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next));
offset += GPR_SLICE_LENGTH(next);
}
return NanEscapeScope(NanNewBufferHandle(result, length));
}
} // namespace node
} // namespace grpc

@ -0,0 +1,23 @@
#ifndef NET_GRPC_NODE_BYTE_BUFFER_H_
#define NET_GRPC_NODE_BYTE_BUFFER_H_
#include <string.h>
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
namespace grpc {
namespace node {
/* Convert a Node.js Buffer to grpc_byte_buffer. Requires that
::node::Buffer::HasInstance(buffer) */
grpc_byte_buffer *BufferToByteBuffer(v8::Handle<v8::Value> buffer);
/* Convert a grpc_byte_buffer to a Node.js Buffer */
v8::Handle<v8::Value> ByteBufferToBuffer(grpc_byte_buffer *buffer);
} // namespace node
} // namespace grpc
#endif // NET_GRPC_NODE_BYTE_BUFFER_H_

@ -0,0 +1,384 @@
#include <node.h>
#include "grpc/grpc.h"
#include "grpc/support/time.h"
#include "byte_buffer.h"
#include "call.h"
#include "channel.h"
#include "completion_queue_async_worker.h"
#include "timeval.h"
#include "tag.h"
namespace grpc {
namespace node {
using ::node::Buffer;
using v8::Arguments;
using v8::Array;
using v8::Exception;
using v8::External;
using v8::Function;
using v8::FunctionTemplate;
using v8::Handle;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
using v8::Number;
using v8::Object;
using v8::ObjectTemplate;
using v8::Persistent;
using v8::Uint32;
using v8::String;
using v8::Value;
Persistent<Function> Call::constructor;
Persistent<FunctionTemplate> Call::fun_tpl;
Call::Call(grpc_call *call) : wrapped_call(call) {
}
Call::~Call() {
grpc_call_destroy(wrapped_call);
}
void Call::Init(Handle<Object> exports) {
NanScope();
Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
tpl->SetClassName(NanNew("Call"));
tpl->InstanceTemplate()->SetInternalFieldCount(1);
NanSetPrototypeTemplate(tpl, "addMetadata",
FunctionTemplate::New(AddMetadata)->GetFunction());
NanSetPrototypeTemplate(tpl, "startInvoke",
FunctionTemplate::New(StartInvoke)->GetFunction());
NanSetPrototypeTemplate(tpl, "serverAccept",
FunctionTemplate::New(ServerAccept)->GetFunction());
NanSetPrototypeTemplate(
tpl,
"serverEndInitialMetadata",
FunctionTemplate::New(ServerEndInitialMetadata)->GetFunction());
NanSetPrototypeTemplate(tpl, "cancel",
FunctionTemplate::New(Cancel)->GetFunction());
NanSetPrototypeTemplate(tpl, "startWrite",
FunctionTemplate::New(StartWrite)->GetFunction());
NanSetPrototypeTemplate(
tpl, "startWriteStatus",
FunctionTemplate::New(StartWriteStatus)->GetFunction());
NanSetPrototypeTemplate(tpl, "writesDone",
FunctionTemplate::New(WritesDone)->GetFunction());
NanSetPrototypeTemplate(tpl, "startReadMetadata",
FunctionTemplate::New(WritesDone)->GetFunction());
NanSetPrototypeTemplate(tpl, "startRead",
FunctionTemplate::New(StartRead)->GetFunction());
NanAssignPersistent(fun_tpl, tpl);
NanAssignPersistent(constructor, tpl->GetFunction());
constructor->Set(NanNew("WRITE_BUFFER_HINT"),
NanNew<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT));
constructor->Set(NanNew("WRITE_NO_COMPRESS"),
NanNew<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS));
exports->Set(String::NewSymbol("Call"), constructor);
}
bool Call::HasInstance(Handle<Value> val) {
NanScope();
return NanHasInstance(fun_tpl, val);
}
Handle<Value> Call::WrapStruct(grpc_call *call) {
NanEscapableScope();
if (call == NULL) {
return NanEscapeScope(NanNull());
}
const int argc = 1;
Handle<Value> argv[argc] = { External::New(reinterpret_cast<void*>(call)) };
return NanEscapeScope(constructor->NewInstance(argc, argv));
}
NAN_METHOD(Call::New) {
NanScope();
if (args.IsConstructCall()) {
Call *call;
if (args[0]->IsExternal()) {
// This option is used for wrapping an existing call
grpc_call *call_value = reinterpret_cast<grpc_call*>(
External::Unwrap(args[0]));
call = new Call(call_value);
} else {
if (!Channel::HasInstance(args[0])) {
return NanThrowTypeError("Call's first argument must be a Channel");
}
if (!args[1]->IsString()) {
return NanThrowTypeError("Call's second argument must be a string");
}
if (!(args[2]->IsNumber() || args[2]->IsDate())) {
return NanThrowTypeError(
"Call's third argument must be a date or a number");
}
Handle<Object> channel_object = args[0]->ToObject();
Channel *channel = ObjectWrap::Unwrap<Channel>(channel_object);
if (channel->GetWrappedChannel() == NULL) {
return NanThrowError("Call cannot be created from a closed channel");
}
NanUtf8String method(args[1]);
double deadline = args[2]->NumberValue();
grpc_channel *wrapped_channel = channel->GetWrappedChannel();
grpc_call *wrapped_call = grpc_channel_create_call(
wrapped_channel,
*method,
channel->GetHost(),
MillisecondsToTimespec(deadline));
call = new Call(wrapped_call);
args.This()->SetHiddenValue(String::NewSymbol("channel_"),
channel_object);
}
call->Wrap(args.This());
NanReturnValue(args.This());
} else {
const int argc = 4;
Local<Value> argv[argc] = { args[0], args[1], args[2], args[3] };
NanReturnValue(constructor->NewInstance(argc, argv));
}
}
NAN_METHOD(Call::AddMetadata) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError(
"addMetadata can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
for (int i=0; !args[i]->IsUndefined(); i++) {
if (!args[i]->IsObject()) {
return NanThrowTypeError(
"addMetadata arguments must be objects with key and value");
}
Handle<Object> item = args[i]->ToObject();
Handle<Value> key = item->Get(NanNew("key"));
if (!key->IsString()) {
return NanThrowTypeError(
"objects passed to addMetadata must have key->string");
}
Handle<Value> value = item->Get(NanNew("value"));
if (!Buffer::HasInstance(value)) {
return NanThrowTypeError(
"objects passed to addMetadata must have value->Buffer");
}
grpc_metadata metadata;
NanUtf8String utf8_key(key);
metadata.key = *utf8_key;
metadata.value = Buffer::Data(value);
metadata.value_length = Buffer::Length(value);
grpc_call_error error = grpc_call_add_metadata(call->wrapped_call,
&metadata,
0);
if (error != GRPC_CALL_OK) {
return NanThrowError("addMetadata failed", error);
}
}
NanReturnUndefined();
}
NAN_METHOD(Call::StartInvoke) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("startInvoke can only be called on Call objects");
}
if (!args[0]->IsFunction()) {
return NanThrowTypeError(
"StartInvoke's first argument must be a function");
}
if (!args[1]->IsFunction()) {
return NanThrowTypeError(
"StartInvoke's second argument must be a function");
}
if (!args[2]->IsFunction()) {
return NanThrowTypeError(
"StartInvoke's third argument must be a function");
}
if (!args[3]->IsUint32()) {
return NanThrowTypeError(
"StartInvoke's fourth argument must be integer flags");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
unsigned int flags = args[3]->Uint32Value();
grpc_call_error error = grpc_call_start_invoke(
call->wrapped_call,
CompletionQueueAsyncWorker::GetQueue(),
CreateTag(args[0], args.This()),
CreateTag(args[1], args.This()),
CreateTag(args[2], args.This()),
flags);
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
CompletionQueueAsyncWorker::Next();
CompletionQueueAsyncWorker::Next();
} else {
return NanThrowError("startInvoke failed", error);
}
NanReturnUndefined();
}
NAN_METHOD(Call::ServerAccept) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("accept can only be called on Call objects");
}
if (!args[0]->IsFunction()) {
return NanThrowTypeError(
"accept's first argument must be a function");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_server_accept(
call->wrapped_call,
CompletionQueueAsyncWorker::GetQueue(),
CreateTag(args[0], args.This()));
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
} else {
return NanThrowError("serverAccept failed", error);
}
NanReturnUndefined();
}
NAN_METHOD(Call::ServerEndInitialMetadata) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError(
"serverEndInitialMetadata can only be called on Call objects");
}
if (!args[0]->IsUint32()) {
return NanThrowTypeError(
"serverEndInitialMetadata's second argument must be integer flags");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
unsigned int flags = args[1]->Uint32Value();
grpc_call_error error = grpc_call_server_end_initial_metadata(
call->wrapped_call,
flags);
if (error != GRPC_CALL_OK) {
return NanThrowError("serverEndInitialMetadata failed", error);
}
NanReturnUndefined();
}
NAN_METHOD(Call::Cancel) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("startInvoke can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_cancel(call->wrapped_call);
if (error != GRPC_CALL_OK) {
return NanThrowError("cancel failed", error);
}
NanReturnUndefined();
}
NAN_METHOD(Call::StartWrite) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("startWrite can only be called on Call objects");
}
if (!Buffer::HasInstance(args[0])) {
return NanThrowTypeError(
"startWrite's first argument must be a Buffer");
}
if (!args[1]->IsFunction()) {
return NanThrowTypeError(
"startWrite's second argument must be a function");
}
if (!args[2]->IsUint32()) {
return NanThrowTypeError(
"startWrite's third argument must be integer flags");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_byte_buffer *buffer = BufferToByteBuffer(args[0]);
unsigned int flags = args[2]->Uint32Value();
grpc_call_error error = grpc_call_start_write(call->wrapped_call,
buffer,
CreateTag(args[1], args.This()),
flags);
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
} else {
return NanThrowError("startWrite failed", error);
}
NanReturnUndefined();
}
NAN_METHOD(Call::StartWriteStatus) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError(
"startWriteStatus can only be called on Call objects");
}
if (!args[0]->IsUint32()) {
return NanThrowTypeError(
"startWriteStatus's first argument must be a status code");
}
if (!args[1]->IsString()) {
return NanThrowTypeError(
"startWriteStatus's second argument must be a string");
}
if (!args[2]->IsFunction()) {
return NanThrowTypeError(
"startWriteStatus's third argument must be a function");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
NanUtf8String details(args[1]);
grpc_call_error error = grpc_call_start_write_status(
call->wrapped_call,
(grpc_status_code)args[0]->Uint32Value(),
*details,
CreateTag(args[2], args.This()));
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
} else {
return NanThrowError("startWriteStatus failed", error);
}
NanReturnUndefined();
}
NAN_METHOD(Call::WritesDone) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("writesDone can only be called on Call objects");
}
if (!args[0]->IsFunction()) {
return NanThrowTypeError(
"writesDone's first argument must be a function");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_writes_done(
call->wrapped_call,
CreateTag(args[0], args.This()));
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
} else {
return NanThrowError("writesDone failed", error);
}
NanReturnUndefined();
}
NAN_METHOD(Call::StartRead) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("startRead can only be called on Call objects");
}
if (!args[0]->IsFunction()) {
return NanThrowTypeError(
"startRead's first argument must be a function");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_start_read(call->wrapped_call,
CreateTag(args[0], args.This()));
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
} else {
return NanThrowError("startRead failed", error);
}
NanReturnUndefined();
}
} // namespace node
} // namespace grpc

@ -0,0 +1,49 @@
#ifndef NET_GRPC_NODE_CALL_H_
#define NET_GRPC_NODE_CALL_H_
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "channel.h"
namespace grpc {
namespace node {
/* Wrapper class for grpc_call structs. */
class Call : public ::node::ObjectWrap {
public:
static void Init(v8::Handle<v8::Object> exports);
static bool HasInstance(v8::Handle<v8::Value> val);
/* Wrap a grpc_call struct in a javascript object */
static v8::Handle<v8::Value> WrapStruct(grpc_call *call);
private:
explicit Call(grpc_call *call);
~Call();
// Prevent copying
Call(const Call&);
Call& operator=(const Call&);
static NAN_METHOD(New);
static NAN_METHOD(AddMetadata);
static NAN_METHOD(StartInvoke);
static NAN_METHOD(ServerAccept);
static NAN_METHOD(ServerEndInitialMetadata);
static NAN_METHOD(Cancel);
static NAN_METHOD(StartWrite);
static NAN_METHOD(StartWriteStatus);
static NAN_METHOD(WritesDone);
static NAN_METHOD(StartRead);
static v8::Persistent<v8::Function> constructor;
// Used for typechecking instances of this javascript class
static v8::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_call *wrapped_call;
};
} // namespace node
} // namespace grpc
#endif // NET_GRPC_NODE_CALL_H_

@ -0,0 +1,155 @@
#include <malloc.h>
#include <vector>
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
#include "channel.h"
#include "credentials.h"
namespace grpc {
namespace node {
using v8::Arguments;
using v8::Array;
using v8::Exception;
using v8::Function;
using v8::FunctionTemplate;
using v8::Handle;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
using v8::Object;
using v8::Persistent;
using v8::String;
using v8::Value;
Persistent<Function> Channel::constructor;
Persistent<FunctionTemplate> Channel::fun_tpl;
Channel::Channel(grpc_channel *channel, NanUtf8String *host)
: wrapped_channel(channel), host(host) {
}
Channel::~Channel() {
if (wrapped_channel != NULL) {
grpc_channel_destroy(wrapped_channel);
}
delete host;
}
void Channel::Init(Handle<Object> exports) {
NanScope();
Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
tpl->SetClassName(NanNew("Channel"));
tpl->InstanceTemplate()->SetInternalFieldCount(1);
NanSetPrototypeTemplate(tpl, "close",
FunctionTemplate::New(Close)->GetFunction());
NanAssignPersistent(fun_tpl, tpl);
NanAssignPersistent(constructor, tpl->GetFunction());
exports->Set(NanNew("Channel"), constructor);
}
bool Channel::HasInstance(Handle<Value> val) {
NanScope();
return NanHasInstance(fun_tpl, val);
}
grpc_channel *Channel::GetWrappedChannel() {
return this->wrapped_channel;
}
char *Channel::GetHost() {
return **this->host;
}
NAN_METHOD(Channel::New) {
NanScope();
if (args.IsConstructCall()) {
if (!args[0]->IsString()) {
return NanThrowTypeError("Channel expects a string and an object");
}
grpc_channel *wrapped_channel;
// Owned by the Channel object
NanUtf8String *host = new NanUtf8String(args[0]);
if (args[1]->IsUndefined()) {
wrapped_channel = grpc_channel_create(**host, NULL);
} else if (args[1]->IsObject()) {
grpc_credentials *creds = NULL;
Handle<Object> args_hash(args[1]->ToObject()->Clone());
if (args_hash->HasOwnProperty(NanNew("credentials"))) {
Handle<Value> creds_value = args_hash->Get(NanNew("credentials"));
if (!Credentials::HasInstance(creds_value)) {
return NanThrowTypeError(
"credentials arg must be a Credentials object");
}
Credentials *creds_object = ObjectWrap::Unwrap<Credentials>(
creds_value->ToObject());
creds = creds_object->GetWrappedCredentials();
args_hash->Delete(NanNew("credentials"));
}
Handle<Array> keys(args_hash->GetOwnPropertyNames());
grpc_channel_args channel_args;
channel_args.num_args = keys->Length();
channel_args.args = reinterpret_cast<grpc_arg*>(
calloc(channel_args.num_args, sizeof(grpc_arg)));
/* These are used to keep all strings until then end of the block, then
destroy them */
std::vector<NanUtf8String*> key_strings(keys->Length());
std::vector<NanUtf8String*> value_strings(keys->Length());
for (unsigned int i = 0; i < channel_args.num_args; i++) {
Handle<String> current_key(keys->Get(i)->ToString());
Handle<Value> current_value(args_hash->Get(current_key));
key_strings[i] = new NanUtf8String(current_key);
channel_args.args[i].key = **key_strings[i];
if (current_value->IsInt32()) {
channel_args.args[i].type = GRPC_ARG_INTEGER;
channel_args.args[i].value.integer = current_value->Int32Value();
} else if (current_value->IsString()) {
channel_args.args[i].type = GRPC_ARG_STRING;
value_strings[i] = new NanUtf8String(current_value);
channel_args.args[i].value.string = **value_strings[i];
} else {
free(channel_args.args);
return NanThrowTypeError("Arg values must be strings");
}
}
if (creds == NULL) {
wrapped_channel = grpc_channel_create(**host, &channel_args);
} else {
wrapped_channel = grpc_secure_channel_create(creds,
**host,
&channel_args);
}
free(channel_args.args);
} else {
return NanThrowTypeError("Channel expects a string and an object");
}
Channel *channel = new Channel(wrapped_channel, host);
channel->Wrap(args.This());
NanReturnValue(args.This());
} else {
const int argc = 2;
Local<Value> argv[argc] = { args[0], args[1] };
NanReturnValue(constructor->NewInstance(argc, argv));
}
}
NAN_METHOD(Channel::Close) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("close can only be called on Channel objects");
}
Channel *channel = ObjectWrap::Unwrap<Channel>(args.This());
if (channel->wrapped_channel != NULL) {
grpc_channel_destroy(channel->wrapped_channel);
channel->wrapped_channel = NULL;
}
NanReturnUndefined();
}
} // namespace node
} // namespace grpc

@ -0,0 +1,46 @@
#ifndef NET_GRPC_NODE_CHANNEL_H_
#define NET_GRPC_NODE_CHANNEL_H_
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
namespace grpc {
namespace node {
/* Wrapper class for grpc_channel structs */
class Channel : public ::node::ObjectWrap {
public:
static void Init(v8::Handle<v8::Object> exports);
static bool HasInstance(v8::Handle<v8::Value> val);
/* This is used to typecheck javascript objects before converting them to
this type */
static v8::Persistent<v8::Value> prototype;
/* Returns the grpc_channel struct that this object wraps */
grpc_channel *GetWrappedChannel();
/* Return the hostname that this channel connects to */
char *GetHost();
private:
explicit Channel(grpc_channel *channel, NanUtf8String *host);
~Channel();
// Prevent copying
Channel(const Channel&);
Channel& operator=(const Channel&);
static NAN_METHOD(New);
static NAN_METHOD(Close);
static v8::Persistent<v8::Function> constructor;
static v8::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_channel *wrapped_channel;
NanUtf8String *host;
};
} // namespace node
} // namespace grpc
#endif // NET_GRPC_NODE_CHANNEL_H_

@ -0,0 +1,176 @@
var grpc = require('bindings')('grpc.node');
var common = require('./common');
var Duplex = require('stream').Duplex;
var util = require('util');
util.inherits(GrpcClientStream, Duplex);
/**
* Class for representing a gRPC client side stream as a Node stream. Extends
* from stream.Duplex.
* @constructor
* @param {grpc.Call} call Call object to proxy
* @param {object} options Stream options
*/
function GrpcClientStream(call, options) {
Duplex.call(this, options);
var self = this;
// Indicates that we can start reading and have not received a null read
var can_read = false;
// Indicates that a read is currently pending
var reading = false;
// Indicates that we can call startWrite
var can_write = false;
// Indicates that a write is currently pending
var writing = false;
this._call = call;
/**
* Callback to handle receiving a READ event. Pushes the data from that event
* onto the read queue and starts reading again if applicable.
* @param {grpc.Event} event The READ event object
*/
function readCallback(event) {
var data = event.data;
if (self.push(data)) {
if (data == null) {
// Disable starting to read after null read was received
can_read = false;
reading = false;
} else {
call.startRead(readCallback);
}
} else {
// Indicate that reading can be resumed by calling startReading
reading = false;
}
};
/**
* Initiate a read, which continues until self.push returns false (indicating
* that reading should be paused) or data is null (indicating that there is no
* more data to read).
*/
function startReading() {
call.startRead(readCallback);
}
// TODO(mlumish): possibly change queue implementation due to shift slowness
var write_queue = [];
/**
* Write the next chunk of data in the write queue if there is one. Otherwise
* indicate that there is no pending write. When the write succeeds, this
* function is called again.
*/
function writeNext() {
if (write_queue.length > 0) {
writing = true;
var next = write_queue.shift();
var writeCallback = function(event) {
next.callback();
writeNext();
};
call.startWrite(next.chunk, writeCallback, 0);
} else {
writing = false;
}
}
call.startInvoke(function(event) {
can_read = true;
can_write = true;
startReading();
writeNext();
}, function(event) {
self.emit('metadata', event.data);
}, function(event) {
self.emit('status', event.data);
}, 0);
this.on('finish', function() {
call.writesDone(function() {});
});
/**
* Indicate that reads should start, and start them if the INVOKE_ACCEPTED
* event has been received.
*/
this._enableRead = function() {
if (!reading) {
reading = true;
if (can_read) {
startReading();
}
}
};
/**
* Push the chunk onto the write queue, and write from the write queue if
* there is not a pending write
* @param {Buffer} chunk The chunk of data to write
* @param {function(Error=)} callback The callback to call when the write
* completes
*/
this._tryWrite = function(chunk, callback) {
write_queue.push({chunk: chunk, callback: callback});
if (can_write && !writing) {
writeNext();
}
};
}
/**
* Start reading. This is an implementation of a method needed for implementing
* stream.Readable.
* @param {number} size Ignored
*/
GrpcClientStream.prototype._read = function(size) {
this._enableRead();
};
/**
* Attempt to write the given chunk. Calls the callback when done. This is an
* implementation of a method needed for implementing stream.Writable.
* @param {Buffer} chunk The chunk to write
* @param {string} encoding Ignored
* @param {function(Error=)} callback Ignored
*/
GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
this._tryWrite(chunk, callback);
};
/**
* Make a request on the channel to the given method with the given arguments
* @param {grpc.Channel} channel The channel on which to make the request
* @param {string} method The method to request
* @param {array=} metadata Array of metadata key/value pairs to add to the call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future.
* @return {stream=} The stream of responses
*/
function makeRequest(channel,
method,
metadata,
deadline) {
if (deadline === undefined) {
deadline = Infinity;
}
var call = new grpc.Call(channel, method, deadline);
if (metadata) {
call.addMetadata(metadata);
}
return new GrpcClientStream(call);
}
/**
* See documentation for makeRequest above
*/
exports.makeRequest = makeRequest;
/**
* Represents a client side gRPC channel associated with a single host.
*/
exports.Channel = grpc.Channel;
/**
* Status name to code number mapping
*/
exports.status = grpc.status;
/**
* Call error name to code number mapping
*/
exports.callError = grpc.callError;

@ -0,0 +1,29 @@
var _ = require('highland');
/**
* When the given stream finishes without error, call the callback once. This
* will not be called until something begins to consume the stream.
* @param {function} callback The callback to call at stream end
* @param {stream} source The stream to watch
* @return {stream} The stream with the callback attached
*/
function onSuccessfulStreamEnd(callback, source) {
var error = false;
return source.consume(function(err, x, push, next) {
if (x === _.nil) {
if (!error) {
callback();
}
push(null, x);
} else if (err) {
error = true;
push(err);
next();
} else {
push(err, x);
next();
}
});
}
exports.onSuccessfulStreamEnd = onSuccessfulStreamEnd;

@ -0,0 +1,62 @@
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "grpc/support/time.h"
#include "completion_queue_async_worker.h"
#include "event.h"
#include "tag.h"
namespace grpc {
namespace node {
using v8::Function;
using v8::Handle;
using v8::Object;
using v8::Persistent;
using v8::Value;
grpc_completion_queue *CompletionQueueAsyncWorker::queue;
CompletionQueueAsyncWorker::CompletionQueueAsyncWorker() :
NanAsyncWorker(NULL) {
}
CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {
}
void CompletionQueueAsyncWorker::Execute() {
result = grpc_completion_queue_next(queue, gpr_inf_future);
}
grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() {
return queue;
}
void CompletionQueueAsyncWorker::Next() {
NanScope();
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
NanAsyncQueueWorker(worker);
}
void CompletionQueueAsyncWorker::Init(Handle<Object> exports) {
NanScope();
queue = grpc_completion_queue_create();
}
void CompletionQueueAsyncWorker::HandleOKCallback() {
NanScope();
NanCallback event_callback(GetTagHandle(result->tag).As<Function>());
Handle<Value> argv[] = {
CreateEventObject(result)
};
DestroyTag(result->tag);
grpc_event_finish(result);
result = NULL;
event_callback.Call(1, argv);
}
} // namespace node
} // namespace grpc

@ -0,0 +1,46 @@
#ifndef NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_
#define NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_
#include <nan.h>
#include "grpc/grpc.h"
namespace grpc {
namespace node {
/* A worker that asynchronously calls completion_queue_next, and queues onto the
node event loop a call to the function stored in the event's tag. */
class CompletionQueueAsyncWorker : public NanAsyncWorker {
public:
CompletionQueueAsyncWorker();
~CompletionQueueAsyncWorker();
/* Calls completion_queue_next with the provided deadline, and stores the
event if there was one or sets an error message if there was not */
void Execute();
/* Returns the completion queue attached to this class */
static grpc_completion_queue *GetQueue();
/* Convenience function to create a worker with the given arguments and queue
it to run asynchronously */
static void Next();
/* Initialize the CompletionQueueAsyncWorker class */
static void Init(v8::Handle<v8::Object> exports);
protected:
/* Called when Execute has succeeded (completed without setting an error
message). Calls the saved callback with the event that came from
completion_queue_next */
void HandleOKCallback();
private:
grpc_event *result;
static grpc_completion_queue *queue;
};
} // namespace node
} // namespace grpc
#endif // NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_

@ -0,0 +1,180 @@
#include <node.h>
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
#include "grpc/support/log.h"
#include "credentials.h"
namespace grpc {
namespace node {
using ::node::Buffer;
using v8::Arguments;
using v8::Exception;
using v8::External;
using v8::Function;
using v8::FunctionTemplate;
using v8::Handle;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
using v8::Object;
using v8::ObjectTemplate;
using v8::Persistent;
using v8::Value;
Persistent<Function> Credentials::constructor;
Persistent<FunctionTemplate> Credentials::fun_tpl;
Credentials::Credentials(grpc_credentials *credentials)
: wrapped_credentials(credentials) {
}
Credentials::~Credentials() {
gpr_log(GPR_DEBUG, "Destroying credentials object");
grpc_credentials_release(wrapped_credentials);
}
void Credentials::Init(Handle<Object> exports) {
NanScope();
Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
tpl->SetClassName(NanNew("Credentials"));
tpl->InstanceTemplate()->SetInternalFieldCount(1);
NanAssignPersistent(fun_tpl, tpl);
NanAssignPersistent(constructor, tpl->GetFunction());
constructor->Set(NanNew("createDefault"),
FunctionTemplate::New(CreateDefault)->GetFunction());
constructor->Set(NanNew("createSsl"),
FunctionTemplate::New(CreateSsl)->GetFunction());
constructor->Set(NanNew("createComposite"),
FunctionTemplate::New(CreateComposite)->GetFunction());
constructor->Set(NanNew("createGce"),
FunctionTemplate::New(CreateGce)->GetFunction());
constructor->Set(NanNew("createFake"),
FunctionTemplate::New(CreateFake)->GetFunction());
constructor->Set(NanNew("createIam"),
FunctionTemplate::New(CreateIam)->GetFunction());
exports->Set(NanNew("Credentials"), constructor);
}
bool Credentials::HasInstance(Handle<Value> val) {
NanScope();
return NanHasInstance(fun_tpl, val);
}
Handle<Value> Credentials::WrapStruct(grpc_credentials *credentials) {
NanEscapableScope();
if (credentials == NULL) {
return NanEscapeScope(NanNull());
}
const int argc = 1;
Handle<Value> argv[argc] = {
External::New(reinterpret_cast<void*>(credentials)) };
return NanEscapeScope(constructor->NewInstance(argc, argv));
}
grpc_credentials *Credentials::GetWrappedCredentials() {
return wrapped_credentials;
}
NAN_METHOD(Credentials::New) {
NanScope();
if (args.IsConstructCall()) {
if (!args[0]->IsExternal()) {
return NanThrowTypeError(
"Credentials can only be created with the provided functions");
}
grpc_credentials *creds_value = reinterpret_cast<grpc_credentials*>(
External::Unwrap(args[0]));
Credentials *credentials = new Credentials(creds_value);
credentials->Wrap(args.This());
NanReturnValue(args.This());
} else {
const int argc = 1;
Local<Value> argv[argc] = { args[0] };
NanReturnValue(constructor->NewInstance(argc, argv));
}
}
NAN_METHOD(Credentials::CreateDefault) {
NanScope();
NanReturnValue(WrapStruct(grpc_default_credentials_create()));
}
NAN_METHOD(Credentials::CreateSsl) {
NanScope();
char *root_certs;
char *private_key = NULL;
char *cert_chain = NULL;
int root_certs_length, private_key_length = 0, cert_chain_length = 0;
if (!Buffer::HasInstance(args[0])) {
return NanThrowTypeError(
"createSsl's first argument must be a Buffer");
}
root_certs = Buffer::Data(args[0]);
root_certs_length = Buffer::Length(args[0]);
if (Buffer::HasInstance(args[1])) {
private_key = Buffer::Data(args[1]);
private_key_length = Buffer::Length(args[1]);
} else if (!(args[1]->IsNull() || args[1]->IsUndefined())) {
return NanThrowTypeError(
"createSSl's second argument must be a Buffer if provided");
}
if (Buffer::HasInstance(args[2])) {
cert_chain = Buffer::Data(args[2]);
cert_chain_length = Buffer::Length(args[2]);
} else if (!(args[2]->IsNull() || args[2]->IsUndefined())) {
return NanThrowTypeError(
"createSSl's third argument must be a Buffer if provided");
}
NanReturnValue(WrapStruct(grpc_ssl_credentials_create(
reinterpret_cast<unsigned char*>(root_certs), root_certs_length,
reinterpret_cast<unsigned char*>(private_key), private_key_length,
reinterpret_cast<unsigned char*>(cert_chain), cert_chain_length)));
}
NAN_METHOD(Credentials::CreateComposite) {
NanScope();
if (!HasInstance(args[0])) {
return NanThrowTypeError(
"createComposite's first argument must be a Credentials object");
}
if (!HasInstance(args[1])) {
return NanThrowTypeError(
"createComposite's second argument must be a Credentials object");
}
Credentials *creds1 = ObjectWrap::Unwrap<Credentials>(args[0]->ToObject());
Credentials *creds2 = ObjectWrap::Unwrap<Credentials>(args[1]->ToObject());
NanReturnValue(WrapStruct(grpc_composite_credentials_create(
creds1->wrapped_credentials, creds2->wrapped_credentials)));
}
NAN_METHOD(Credentials::CreateGce) {
NanScope();
NanReturnValue(WrapStruct(grpc_compute_engine_credentials_create()));
}
NAN_METHOD(Credentials::CreateFake) {
NanScope();
NanReturnValue(WrapStruct(grpc_fake_transport_security_credentials_create()));
}
NAN_METHOD(Credentials::CreateIam) {
NanScope();
if (!args[0]->IsString()) {
return NanThrowTypeError(
"createIam's first argument must be a string");
}
if (!args[1]->IsString()) {
return NanThrowTypeError(
"createIam's second argument must be a string");
}
NanUtf8String auth_token(args[0]);
NanUtf8String auth_selector(args[1]);
NanReturnValue(WrapStruct(grpc_iam_credentials_create(*auth_token,
*auth_selector)));
}
} // namespace node
} // namespace grpc

@ -0,0 +1,48 @@
#ifndef NET_GRPC_NODE_CREDENTIALS_H_
#define NET_GRPC_NODE_CREDENTIALS_H_
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
namespace grpc {
namespace node {
/* Wrapper class for grpc_credentials structs */
class Credentials : public ::node::ObjectWrap {
public:
static void Init(v8::Handle<v8::Object> exports);
static bool HasInstance(v8::Handle<v8::Value> val);
/* Wrap a grpc_credentials struct in a javascript object */
static v8::Handle<v8::Value> WrapStruct(grpc_credentials *credentials);
/* Returns the grpc_credentials struct that this object wraps */
grpc_credentials *GetWrappedCredentials();
private:
explicit Credentials(grpc_credentials *credentials);
~Credentials();
// Prevent copying
Credentials(const Credentials&);
Credentials& operator=(const Credentials&);
static NAN_METHOD(New);
static NAN_METHOD(CreateDefault);
static NAN_METHOD(CreateSsl);
static NAN_METHOD(CreateComposite);
static NAN_METHOD(CreateGce);
static NAN_METHOD(CreateFake);
static NAN_METHOD(CreateIam);
static v8::Persistent<v8::Function> constructor;
// Used for typechecking instances of this javascript class
static v8::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_credentials *wrapped_credentials;
};
} // namespace node
} // namespace grpc
#endif // NET_GRPC_NODE_CREDENTIALS_H_

@ -0,0 +1,132 @@
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "byte_buffer.h"
#include "call.h"
#include "event.h"
#include "tag.h"
#include "timeval.h"
namespace grpc {
namespace node {
using v8::Array;
using v8::Date;
using v8::Handle;
using v8::HandleScope;
using v8::Number;
using v8::Object;
using v8::Persistent;
using v8::String;
using v8::Value;
Handle<Value> GetEventData(grpc_event *event) {
NanEscapableScope();
size_t count;
grpc_metadata *items;
Handle<Array> metadata;
Handle<Object> status;
Handle<Object> rpc_new;
switch (event->type) {
case GRPC_READ:
return NanEscapeScope(ByteBufferToBuffer(event->data.read));
case GRPC_INVOKE_ACCEPTED:
return NanEscapeScope(NanNew<Number>(event->data.invoke_accepted));
case GRPC_WRITE_ACCEPTED:
return NanEscapeScope(NanNew<Number>(event->data.write_accepted));
case GRPC_FINISH_ACCEPTED:
return NanEscapeScope(NanNew<Number>(event->data.finish_accepted));
case GRPC_CLIENT_METADATA_READ:
count = event->data.client_metadata_read.count;
items = event->data.client_metadata_read.elements;
metadata = NanNew<Array>(static_cast<int>(count));
for (unsigned int i = 0; i < count; i++) {
Handle<Object> item_obj = NanNew<Object>();
item_obj->Set(NanNew<String, const char *>("key"),
NanNew<String, char *>(items[i].key));
item_obj->Set(NanNew<String, const char *>("value"),
NanNew<String, char *>(
items[i].value,
static_cast<int>(items[i].value_length)));
metadata->Set(i, item_obj);
}
return NanEscapeScope(metadata);
case GRPC_FINISHED:
status = NanNew<Object>();
status->Set(NanNew("code"), NanNew<Number>(
event->data.finished.status));
if (event->data.finished.details != NULL) {
status->Set(NanNew("details"), String::New(
event->data.finished.details));
}
count = event->data.finished.metadata_count;
items = event->data.finished.metadata_elements;
metadata = NanNew<Array>(static_cast<int>(count));
for (unsigned int i = 0; i < count; i++) {
Handle<Object> item_obj = NanNew<Object>();
item_obj->Set(NanNew<String, const char *>("key"),
NanNew<String, char *>(items[i].key));
item_obj->Set(NanNew<String, const char *>("value"),
NanNew<String, char *>(
items[i].value,
static_cast<int>(items[i].value_length)));
metadata->Set(i, item_obj);
}
status->Set(NanNew("metadata"), metadata);
return NanEscapeScope(status);
case GRPC_SERVER_RPC_NEW:
rpc_new = NanNew<Object>();
if (event->data.server_rpc_new.method == NULL) {
return NanEscapeScope(NanNull());
}
rpc_new->Set(NanNew<String, const char *>("method"),
NanNew<String, const char *>(
event->data.server_rpc_new.method));
rpc_new->Set(NanNew<String, const char *>("host"),
NanNew<String, const char *>(
event->data.server_rpc_new.host));
rpc_new->Set(NanNew<String, const char *>("absolute_deadline"),
NanNew<Date>(TimespecToMilliseconds(
event->data.server_rpc_new.deadline)));
count = event->data.server_rpc_new.metadata_count;
items = event->data.server_rpc_new.metadata_elements;
metadata = NanNew<Array>(static_cast<int>(count));
for (unsigned int i = 0; i < count; i++) {
Handle<Object> item_obj = Object::New();
item_obj->Set(NanNew<String, const char *>("key"),
NanNew<String, char *>(items[i].key));
item_obj->Set(NanNew<String, const char *>("value"),
NanNew<String, char *>(
items[i].value,
static_cast<int>(items[i].value_length)));
metadata->Set(i, item_obj);
}
rpc_new->Set(NanNew<String, const char *>("metadata"), metadata);
return NanEscapeScope(rpc_new);
default:
return NanEscapeScope(NanNull());
}
}
Handle<Value> CreateEventObject(grpc_event *event) {
NanEscapableScope();
if (event == NULL) {
return NanEscapeScope(NanNull());
}
Handle<Object> event_obj = NanNew<Object>();
Handle<Value> call;
if (TagHasCall(event->tag)) {
call = TagGetCall(event->tag);
} else {
call = Call::WrapStruct(event->call);
}
event_obj->Set(NanNew<String, const char *>("call"), call);
event_obj->Set(NanNew<String, const char *>("type"),
NanNew<Number>(event->type));
event_obj->Set(NanNew<String, const char *>("data"), GetEventData(event));
return NanEscapeScope(event_obj);
}
} // namespace node
} // namespace grpc

@ -0,0 +1,15 @@
#ifndef NET_GRPC_NODE_EVENT_H_
#define NET_GRPC_NODE_EVENT_H_
#include <node.h>
#include "grpc/grpc.h"
namespace grpc {
namespace node {
v8::Handle<v8::Value> CreateEventObject(grpc_event *event);
} // namespace node
} // namespace grpc
#endif // NET_GRPC_NODE_EVENT_H_

@ -0,0 +1,25 @@
syntax = "proto2";
package math;
message DivArgs {
required int64 dividend = 1;
required int64 divisor = 2;
}
message DivReply {
required int64 quotient = 1;
required int64 remainder = 2;
}
message FibArgs {
optional int64 limit = 1;
}
message Num {
required int64 num = 1;
}
message FibReply {
required int64 count = 1;
}

@ -0,0 +1,168 @@
var _ = require('underscore');
var ProtoBuf = require('protobufjs');
var fs = require('fs');
var util = require('util');
var Transform = require('stream').Transform;
var builder = ProtoBuf.loadProtoFile(__dirname + '/math.proto');
var math = builder.build('math');
var makeConstructor = require('../surface_server.js').makeServerConstructor;
/**
* Get a function that deserializes a specific type of protobuf.
* @param {function()} cls The constructor of the message type to deserialize
* @return {function(Buffer):cls} The deserialization function
*/
function deserializeCls(cls) {
/**
* Deserialize a buffer to a message object
* @param {Buffer} arg_buf The buffer to deserialize
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
return cls.decode(arg_buf);
};
}
/**
* Get a function that serializes objects to a buffer by protobuf class.
* @param {function()} Cls The constructor of the message type to serialize
* @return {function(Cls):Buffer} The serialization function
*/
function serializeCls(Cls) {
/**
* Serialize an object to a Buffer
* @param {Object} arg The object to serialize
* @return {Buffer} The serialized object
*/
return function serialize(arg) {
return new Buffer(new Cls(arg).encode().toBuffer());
};
}
/* This function call creates a server constructor for servers that that expose
* the four specified methods. This specifies how to serialize messages that the
* server sends and deserialize messages that the client sends, and whether the
* client or the server will send a stream of messages, for each method. This
* also specifies a prefix that will be added to method names when sending them
* on the wire. This function call and all of the preceding code in this file
* are intended to approximate what the generated code will look like for the
* math service */
var Server = makeConstructor({
Div: {
serialize: serializeCls(math.DivReply),
deserialize: deserializeCls(math.DivArgs),
client_stream: false,
server_stream: false
},
Fib: {
serialize: serializeCls(math.Num),
deserialize: deserializeCls(math.FibArgs),
client_stream: false,
server_stream: true
},
Sum: {
serialize: serializeCls(math.Num),
deserialize: deserializeCls(math.Num),
client_stream: true,
server_stream: false
},
DivMany: {
serialize: serializeCls(math.DivReply),
deserialize: deserializeCls(math.DivArgs),
client_stream: true,
server_stream: true
}
}, '/Math/');
/**
* Server function for division. Provides the /Math/DivMany and /Math/Div
* functions (Div is just DivMany with only one stream element). For each
* DivArgs parameter, responds with a DivReply with the results of the division
* @param {Object} call The object containing request and cancellation info
* @param {function(Error, *)} cb Response callback
*/
function mathDiv(call, cb) {
var req = call.request;
if (req.divisor == 0) {
cb(new Error('cannot divide by zero'));
}
cb(null, {
quotient: req.dividend / req.divisor,
remainder: req.dividend % req.divisor
});
}
/**
* Server function for Fibonacci numbers. Provides the /Math/Fib function. Reads
* a single parameter that indicates the number of responses, and then responds
* with a stream of that many Fibonacci numbers.
* @param {stream} stream The stream for sending responses.
*/
function mathFib(stream) {
// Here, call is a standard writable Node object Stream
var previous = 0, current = 1;
for (var i = 0; i < stream.request.limit; i++) {
stream.write({num: current});
var temp = current;
current += previous;
previous = temp;
}
stream.end();
}
/**
* Server function for summation. Provides the /Math/Sum function. Reads a
* stream of number parameters, then responds with their sum.
* @param {stream} call The stream of arguments.
* @param {function(Error, *)} cb Response callback
*/
function mathSum(call, cb) {
// Here, call is a standard readable Node object Stream
var sum = 0;
call.on('data', function(data) {
sum += data.num | 0;
});
call.on('end', function() {
cb(null, {num: sum});
});
}
function mathDivMany(stream) {
// Here, call is a standard duplex Node object Stream
util.inherits(DivTransform, Transform);
function DivTransform() {
var options = {objectMode: true};
Transform.call(this, options);
}
DivTransform.prototype._transform = function(div_args, encoding, callback) {
if (div_args.divisor == 0) {
callback(new Error('cannot divide by zero'));
}
callback(null, {
quotient: div_args.dividend / div_args.divisor,
remainder: div_args.dividend % div_args.divisor
});
};
var transform = new DivTransform();
stream.pipe(transform);
transform.pipe(stream);
}
var server = new Server({
Div: mathDiv,
Fib: mathFib,
Sum: mathSum,
DivMany: mathDivMany
});
if (require.main === module) {
server.bind('localhost:7070').listen();
}
/**
* See docs for server
*/
module.exports = server;

@ -0,0 +1,151 @@
#include <node.h>
#include <nan.h>
#include <v8.h>
#include "grpc/grpc.h"
#include "call.h"
#include "channel.h"
#include "event.h"
#include "server.h"
#include "completion_queue_async_worker.h"
#include "credentials.h"
#include "server_credentials.h"
using v8::Handle;
using v8::Value;
using v8::Object;
using v8::Uint32;
using v8::String;
void InitStatusConstants(Handle<Object> exports) {
NanScope();
Handle<Object> status = Object::New();
exports->Set(NanNew("status"), status);
Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_STATUS_OK));
status->Set(NanNew("OK"), OK);
Handle<Value> CANCELLED(NanNew<Uint32, uint32_t>(GRPC_STATUS_CANCELLED));
status->Set(NanNew("CANCELLED"), CANCELLED);
Handle<Value> UNKNOWN(NanNew<Uint32, uint32_t>(GRPC_STATUS_UNKNOWN));
status->Set(NanNew("UNKNOWN"), UNKNOWN);
Handle<Value> INVALID_ARGUMENT(
NanNew<Uint32, uint32_t>(GRPC_STATUS_INVALID_ARGUMENT));
status->Set(NanNew("INVALID_ARGUMENT"), INVALID_ARGUMENT);
Handle<Value> DEADLINE_EXCEEDED(
NanNew<Uint32, uint32_t>(GRPC_STATUS_DEADLINE_EXCEEDED));
status->Set(NanNew("DEADLINE_EXCEEDED"), DEADLINE_EXCEEDED);
Handle<Value> NOT_FOUND(NanNew<Uint32, uint32_t>(GRPC_STATUS_NOT_FOUND));
status->Set(NanNew("NOT_FOUND"), NOT_FOUND);
Handle<Value> ALREADY_EXISTS(
NanNew<Uint32, uint32_t>(GRPC_STATUS_ALREADY_EXISTS));
status->Set(NanNew("ALREADY_EXISTS"), ALREADY_EXISTS);
Handle<Value> PERMISSION_DENIED(
NanNew<Uint32, uint32_t>(GRPC_STATUS_PERMISSION_DENIED));
status->Set(NanNew("PERMISSION_DENIED"), PERMISSION_DENIED);
Handle<Value> UNAUTHENTICATED(
NanNew<Uint32, uint32_t>(GRPC_STATUS_UNAUTHENTICATED));
status->Set(NanNew("UNAUTHENTICATED"), UNAUTHENTICATED);
Handle<Value> RESOURCE_EXHAUSTED(
NanNew<Uint32, uint32_t>(GRPC_STATUS_RESOURCE_EXHAUSTED));
status->Set(NanNew("RESOURCE_EXHAUSTED"), RESOURCE_EXHAUSTED);
Handle<Value> FAILED_PRECONDITION(
NanNew<Uint32, uint32_t>(GRPC_STATUS_FAILED_PRECONDITION));
status->Set(NanNew("FAILED_PRECONDITION"), FAILED_PRECONDITION);
Handle<Value> ABORTED(NanNew<Uint32, uint32_t>(GRPC_STATUS_ABORTED));
status->Set(NanNew("ABORTED"), ABORTED);
Handle<Value> OUT_OF_RANGE(
NanNew<Uint32, uint32_t>(GRPC_STATUS_OUT_OF_RANGE));
status->Set(NanNew("OUT_OF_RANGE"), OUT_OF_RANGE);
Handle<Value> UNIMPLEMENTED(
NanNew<Uint32, uint32_t>(GRPC_STATUS_UNIMPLEMENTED));
status->Set(NanNew("UNIMPLEMENTED"), UNIMPLEMENTED);
Handle<Value> INTERNAL(NanNew<Uint32, uint32_t>(GRPC_STATUS_INTERNAL));
status->Set(NanNew("INTERNAL"), INTERNAL);
Handle<Value> UNAVAILABLE(NanNew<Uint32, uint32_t>(GRPC_STATUS_UNAVAILABLE));
status->Set(NanNew("UNAVAILABLE"), UNAVAILABLE);
Handle<Value> DATA_LOSS(NanNew<Uint32, uint32_t>(GRPC_STATUS_DATA_LOSS));
status->Set(NanNew("DATA_LOSS"), DATA_LOSS);
}
void InitCallErrorConstants(Handle<Object> exports) {
NanScope();
Handle<Object> call_error = Object::New();
exports->Set(NanNew("callError"), call_error);
Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_CALL_OK));
call_error->Set(NanNew("OK"), OK);
Handle<Value> ERROR(NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR));
call_error->Set(NanNew("ERROR"), ERROR);
Handle<Value> NOT_ON_SERVER(
NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_ON_SERVER));
call_error->Set(NanNew("NOT_ON_SERVER"), NOT_ON_SERVER);
Handle<Value> NOT_ON_CLIENT(
NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_ON_CLIENT));
call_error->Set(NanNew("NOT_ON_CLIENT"), NOT_ON_CLIENT);
Handle<Value> ALREADY_INVOKED(
NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_ALREADY_INVOKED));
call_error->Set(NanNew("ALREADY_INVOKED"), ALREADY_INVOKED);
Handle<Value> NOT_INVOKED(
NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_INVOKED));
call_error->Set(NanNew("NOT_INVOKED"), NOT_INVOKED);
Handle<Value> ALREADY_FINISHED(
NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_ALREADY_FINISHED));
call_error->Set(NanNew("ALREADY_FINISHED"), ALREADY_FINISHED);
Handle<Value> TOO_MANY_OPERATIONS(
NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS));
call_error->Set(NanNew("TOO_MANY_OPERATIONS"),
TOO_MANY_OPERATIONS);
Handle<Value> INVALID_FLAGS(
NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_INVALID_FLAGS));
call_error->Set(NanNew("INVALID_FLAGS"), INVALID_FLAGS);
}
void InitOpErrorConstants(Handle<Object> exports) {
NanScope();
Handle<Object> op_error = Object::New();
exports->Set(NanNew("opError"), op_error);
Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_OP_OK));
op_error->Set(NanNew("OK"), OK);
Handle<Value> ERROR(NanNew<Uint32, uint32_t>(GRPC_OP_ERROR));
op_error->Set(NanNew("ERROR"), ERROR);
}
void InitCompletionTypeConstants(Handle<Object> exports) {
NanScope();
Handle<Object> completion_type = Object::New();
exports->Set(NanNew("completionType"), completion_type);
Handle<Value> QUEUE_SHUTDOWN(NanNew<Uint32, uint32_t>(GRPC_QUEUE_SHUTDOWN));
completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN);
Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ));
completion_type->Set(NanNew("READ"), READ);
Handle<Value> INVOKE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_INVOKE_ACCEPTED));
completion_type->Set(NanNew("INVOKE_ACCEPTED"), INVOKE_ACCEPTED);
Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED));
completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED);
Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED));
completion_type->Set(NanNew("FINISH_ACCEPTED"), FINISH_ACCEPTED);
Handle<Value> CLIENT_METADATA_READ(
NanNew<Uint32, uint32_t>(GRPC_CLIENT_METADATA_READ));
completion_type->Set(NanNew("CLIENT_METADATA_READ"),
CLIENT_METADATA_READ);
Handle<Value> FINISHED(NanNew<Uint32, uint32_t>(GRPC_FINISHED));
completion_type->Set(NanNew("FINISHED"), FINISHED);
Handle<Value> SERVER_RPC_NEW(NanNew<Uint32, uint32_t>(GRPC_SERVER_RPC_NEW));
completion_type->Set(NanNew("SERVER_RPC_NEW"), SERVER_RPC_NEW);
}
void init(Handle<Object> exports) {
NanScope();
grpc_init();
InitStatusConstants(exports);
InitCallErrorConstants(exports);
InitOpErrorConstants(exports);
InitCompletionTypeConstants(exports);
grpc::node::Call::Init(exports);
grpc::node::Channel::Init(exports);
grpc::node::Server::Init(exports);
grpc::node::CompletionQueueAsyncWorker::Init(exports);
grpc::node::Credentials::Init(exports);
grpc::node::ServerCredentials::Init(exports);
}
NODE_MODULE(grpc, init)

@ -0,0 +1,18 @@
{
"name": "grpc",
"version": "0.1.0",
"description": "gRPC Library for Node",
"scripts": {
"test": "./node_modules/mocha/bin/mocha"
},
"dependencies": {
"bindings": "^1.2.1",
"nan": "~1.3.0",
"underscore": "^1.7.0"
},
"devDependencies": {
"mocha": "~1.21.0",
"highland": "~2.0.0",
"protobufjs": "~3.8.0"
}
}

@ -0,0 +1,19 @@
var net = require('net');
/**
* Finds a free port that a server can bind to, in the format
* "address:port"
* @param {function(string)} cb The callback that should execute when the port
* is available
*/
function nextAvailablePort(cb) {
var server = net.createServer();
server.listen(function() {
var address = server.address();
server.close(function() {
cb(address.address + ':' + address.port.toString());
});
});
}
exports.nextAvailablePort = nextAvailablePort;

@ -0,0 +1,212 @@
#include "server.h"
#include <node.h>
#include <nan.h>
#include <malloc.h>
#include <vector>
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
#include "call.h"
#include "completion_queue_async_worker.h"
#include "tag.h"
#include "server_credentials.h"
namespace grpc {
namespace node {
using v8::Arguments;
using v8::Array;
using v8::Boolean;
using v8::Exception;
using v8::Function;
using v8::FunctionTemplate;
using v8::Handle;
using v8::HandleScope;
using v8::Local;
using v8::Number;
using v8::Object;
using v8::Persistent;
using v8::String;
using v8::Value;
Persistent<Function> Server::constructor;
Persistent<FunctionTemplate> Server::fun_tpl;
Server::Server(grpc_server *server) : wrapped_server(server) {
}
Server::~Server() {
grpc_server_destroy(wrapped_server);
}
void Server::Init(Handle<Object> exports) {
NanScope();
Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
tpl->SetClassName(String::NewSymbol("Server"));
tpl->InstanceTemplate()->SetInternalFieldCount(1);
NanSetPrototypeTemplate(tpl, "requestCall",
FunctionTemplate::New(RequestCall)->GetFunction());
NanSetPrototypeTemplate(tpl, "addHttp2Port",
FunctionTemplate::New(AddHttp2Port)->GetFunction());
NanSetPrototypeTemplate(tpl, "addSecureHttp2Port",
FunctionTemplate::New(
AddSecureHttp2Port)->GetFunction());
NanSetPrototypeTemplate(tpl, "start",
FunctionTemplate::New(Start)->GetFunction());
NanSetPrototypeTemplate(tpl, "shutdown",
FunctionTemplate::New(Shutdown)->GetFunction());
NanAssignPersistent(fun_tpl, tpl);
NanAssignPersistent(constructor, tpl->GetFunction());
exports->Set(String::NewSymbol("Server"), constructor);
}
bool Server::HasInstance(Handle<Value> val) {
return NanHasInstance(fun_tpl, val);
}
NAN_METHOD(Server::New) {
NanScope();
/* If this is not a constructor call, make a constructor call and return
the result */
if (!args.IsConstructCall()) {
const int argc = 1;
Local<Value> argv[argc] = { args[0] };
NanReturnValue(constructor->NewInstance(argc, argv));
}
grpc_server *wrapped_server;
grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue();
if (args[0]->IsUndefined()) {
wrapped_server = grpc_server_create(queue, NULL);
} else if (args[0]->IsObject()) {
grpc_server_credentials *creds = NULL;
Handle<Object> args_hash(args[0]->ToObject()->Clone());
if (args_hash->HasOwnProperty(NanNew("credentials"))) {
Handle<Value> creds_value = args_hash->Get(NanNew("credentials"));
if (!ServerCredentials::HasInstance(creds_value)) {
return NanThrowTypeError(
"credentials arg must be a ServerCredentials object");
}
ServerCredentials *creds_object = ObjectWrap::Unwrap<ServerCredentials>(
creds_value->ToObject());
creds = creds_object->GetWrappedServerCredentials();
args_hash->Delete(NanNew("credentials"));
}
Handle<Array> keys(args_hash->GetOwnPropertyNames());
grpc_channel_args channel_args;
channel_args.num_args = keys->Length();
channel_args.args = reinterpret_cast<grpc_arg*>(
calloc(channel_args.num_args, sizeof(grpc_arg)));
/* These are used to keep all strings until then end of the block, then
destroy them */
std::vector<NanUtf8String*> key_strings(keys->Length());
std::vector<NanUtf8String*> value_strings(keys->Length());
for (unsigned int i = 0; i < channel_args.num_args; i++) {
Handle<String> current_key(keys->Get(i)->ToString());
Handle<Value> current_value(args_hash->Get(current_key));
key_strings[i] = new NanUtf8String(current_key);
channel_args.args[i].key = **key_strings[i];
if (current_value->IsInt32()) {
channel_args.args[i].type = GRPC_ARG_INTEGER;
channel_args.args[i].value.integer = current_value->Int32Value();
} else if (current_value->IsString()) {
channel_args.args[i].type = GRPC_ARG_STRING;
value_strings[i] = new NanUtf8String(current_value);
channel_args.args[i].value.string = **value_strings[i];
} else {
free(channel_args.args);
return NanThrowTypeError("Arg values must be strings");
}
}
if (creds == NULL) {
wrapped_server = grpc_server_create(queue,
&channel_args);
} else {
wrapped_server = grpc_secure_server_create(creds,
queue,
&channel_args);
}
free(channel_args.args);
} else {
return NanThrowTypeError("Server expects an object");
}
Server *server = new Server(wrapped_server);
server->Wrap(args.This());
NanReturnValue(args.This());
}
NAN_METHOD(Server::RequestCall) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("requestCall can only be called on a Server");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
grpc_call_error error = grpc_server_request_call(
server->wrapped_server,
CreateTag(args[0], NanNull()));
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
} else {
return NanThrowError("requestCall failed", error);
}
NanReturnUndefined();
}
NAN_METHOD(Server::AddHttp2Port) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("addHttp2Port can only be called on a Server");
}
if (!args[0]->IsString()) {
return NanThrowTypeError("addHttp2Port's argument must be a String");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
NanReturnValue(NanNew<Boolean>(grpc_server_add_http2_port(
server->wrapped_server,
*NanUtf8String(args[0]))));
}
NAN_METHOD(Server::AddSecureHttp2Port) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError(
"addSecureHttp2Port can only be called on a Server");
}
if (!args[0]->IsString()) {
return NanThrowTypeError("addSecureHttp2Port's argument must be a String");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
NanReturnValue(NanNew<Boolean>(grpc_server_add_secure_http2_port(
server->wrapped_server,
*NanUtf8String(args[0]))));
}
NAN_METHOD(Server::Start) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("start can only be called on a Server");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
grpc_server_start(server->wrapped_server);
NanReturnUndefined();
}
NAN_METHOD(Server::Shutdown) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("shutdown can only be called on a Server");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
grpc_server_shutdown(server->wrapped_server);
NanReturnUndefined();
}
} // namespace node
} // namespace grpc

@ -0,0 +1,46 @@
#ifndef NET_GRPC_NODE_SERVER_H_
#define NET_GRPC_NODE_SERVER_H_
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
namespace grpc {
namespace node {
/* Wraps grpc_server as a JavaScript object. Provides a constructor
and wrapper methods for grpc_server_create, grpc_server_request_call,
grpc_server_add_http2_port, and grpc_server_start. */
class Server : public ::node::ObjectWrap {
public:
/* Initializes the Server class and exposes the constructor and
wrapper methods to JavaScript */
static void Init(v8::Handle<v8::Object> exports);
/* Tests whether the given value was constructed by this class's
JavaScript constructor */
static bool HasInstance(v8::Handle<v8::Value> val);
private:
explicit Server(grpc_server *server);
~Server();
// Prevent copying
Server(const Server&);
Server& operator=(const Server&);
static NAN_METHOD(New);
static NAN_METHOD(RequestCall);
static NAN_METHOD(AddHttp2Port);
static NAN_METHOD(AddSecureHttp2Port);
static NAN_METHOD(Start);
static NAN_METHOD(Shutdown);
static v8::Persistent<v8::Function> constructor;
static v8::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_server *wrapped_server;
};
} // namespace node
} // namespace grpc
#endif // NET_GRPC_NODE_SERVER_H_

@ -0,0 +1,228 @@
var grpc = require('bindings')('grpc.node');
var common = require('./common');
var Duplex = require('stream').Duplex;
var util = require('util');
util.inherits(GrpcServerStream, Duplex);
/**
* Class for representing a gRPC server side stream as a Node stream. Extends
* from stream.Duplex.
* @constructor
* @param {grpc.Call} call Call object to proxy
* @param {object} options Stream options
*/
function GrpcServerStream(call, options) {
Duplex.call(this, options);
this._call = call;
// Indicate that a status has been sent
var finished = false;
var self = this;
var status = {
'code' : grpc.status.OK,
'details' : 'OK'
};
/**
* Send the pending status
*/
function sendStatus() {
call.startWriteStatus(status.code, status.details, function() {
});
finished = true;
}
this.on('finish', sendStatus);
/**
* Set the pending status to a given error status. If the error does not have
* code or details properties, the code will be set to grpc.status.INTERNAL
* and the details will be set to 'Unknown Error'.
* @param {Error} err The error object
*/
function setStatus(err) {
var code = grpc.status.INTERNAL;
var details = 'Unknown Error';
if (err.hasOwnProperty('code')) {
code = err.code;
if (err.hasOwnProperty('details')) {
details = err.details;
}
}
status = {'code': code, 'details': details};
}
/**
* Terminate the call. This includes indicating that reads are done, draining
* all pending writes, and sending the given error as a status
* @param {Error} err The error object
* @this GrpcServerStream
*/
function terminateCall(err) {
// Drain readable data
this.on('data', function() {});
setStatus(err);
this.end();
}
this.on('error', terminateCall);
// Indicates that a read is pending
var reading = false;
/**
* Callback to be called when a READ event is received. Pushes the data onto
* the read queue and starts reading again if applicable
* @param {grpc.Event} event READ event object
*/
function readCallback(event) {
if (finished) {
self.push(null);
return;
}
var data = event.data;
if (self.push(data) && data != null) {
self._call.startRead(readCallback);
} else {
reading = false;
}
}
/**
* Start reading if there is not already a pending read. Reading will
* continue until self.push returns false (indicating reads should slow
* down) or the read data is null (indicating that there is no more data).
*/
this.startReading = function() {
if (finished) {
self.push(null);
} else {
if (!reading) {
reading = true;
self._call.startRead(readCallback);
}
}
};
}
/**
* Start reading from the gRPC data source. This is an implementation of a
* method required for implementing stream.Readable
* @param {number} size Ignored
*/
GrpcServerStream.prototype._read = function(size) {
this.startReading();
};
/**
* Start writing a chunk of data. This is an implementation of a method required
* for implementing stream.Writable.
* @param {Buffer} chunk The chunk of data to write
* @param {string} encoding Ignored
* @param {function(Error=)} callback Callback to indicate that the write is
* complete
*/
GrpcServerStream.prototype._write = function(chunk, encoding, callback) {
var self = this;
self._call.startWrite(chunk, function(event) {
callback();
}, 0);
};
/**
* Constructs a server object that stores request handlers and delegates
* incoming requests to those handlers
* @constructor
* @param {Array} options Options that should be passed to the internal server
* implementation
*/
function Server(options) {
this.handlers = {};
var handlers = this.handlers;
var server = new grpc.Server(options);
this._server = server;
var started = false;
/**
* Start the server and begin handling requests
* @this Server
*/
this.start = function() {
if (this.started) {
throw 'Server is already running';
}
server.start();
/**
* Handles the SERVER_RPC_NEW event. If there is a handler associated with
* the requested method, use that handler to respond to the request. Then
* wait for the next request
* @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW
*/
function handleNewCall(event) {
var call = event.call;
var data = event.data;
if (data == null) {
return;
}
server.requestCall(handleNewCall);
var handler = undefined;
var deadline = data.absolute_deadline;
var cancelled = false;
if (handlers.hasOwnProperty(data.method)) {
handler = handlers[data.method];
}
call.serverAccept(function(event) {
if (event.data.code === grpc.status.CANCELLED) {
cancelled = true;
}
}, 0);
call.serverEndInitialMetadata(0);
var stream = new GrpcServerStream(call);
Object.defineProperty(stream, 'cancelled', {
get: function() { return cancelled;}
});
try {
handler(stream, data.metadata);
} catch (e) {
stream.emit('error', e);
}
}
server.requestCall(handleNewCall);
};
/** Shuts down the server.
*/
this.shutdown = function() {
server.shutdown();
};
}
/**
* Registers a handler to handle the named method. Fails if there already is
* a handler for the given method. Returns true on success
* @param {string} name The name of the method that the provided function should
* handle/respond to.
* @param {function} handler Function that takes a stream of request values and
* returns a stream of response values
* @return {boolean} True if the handler was set. False if a handler was already
* set for that name.
*/
Server.prototype.register = function(name, handler) {
if (this.handlers.hasOwnProperty(name)) {
return false;
}
this.handlers[name] = handler;
return true;
};
/**
* Binds the server to the given port, with SSL enabled if secure is specified
* @param {string} port The port that the server should bind on, in the format
* "address:port"
* @param {boolean=} secure Whether the server should open a secure port
*/
Server.prototype.bind = function(port, secure) {
if (secure) {
this._server.addSecureHttp2Port(port);
} else {
this._server.addHttp2Port(port);
}
};
/**
* See documentation for Server
*/
module.exports = Server;

@ -0,0 +1,131 @@
#include <node.h>
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
#include "grpc/support/log.h"
#include "server_credentials.h"
namespace grpc {
namespace node {
using ::node::Buffer;
using v8::Arguments;
using v8::Exception;
using v8::External;
using v8::Function;
using v8::FunctionTemplate;
using v8::Handle;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
using v8::Object;
using v8::ObjectTemplate;
using v8::Persistent;
using v8::Value;
Persistent<Function> ServerCredentials::constructor;
Persistent<FunctionTemplate> ServerCredentials::fun_tpl;
ServerCredentials::ServerCredentials(grpc_server_credentials *credentials)
: wrapped_credentials(credentials) {
}
ServerCredentials::~ServerCredentials() {
gpr_log(GPR_DEBUG, "Destroying server credentials object");
grpc_server_credentials_release(wrapped_credentials);
}
void ServerCredentials::Init(Handle<Object> exports) {
NanScope();
Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
tpl->SetClassName(NanNew("ServerCredentials"));
tpl->InstanceTemplate()->SetInternalFieldCount(1);
NanAssignPersistent(fun_tpl, tpl);
NanAssignPersistent(constructor, tpl->GetFunction());
constructor->Set(NanNew("createSsl"),
FunctionTemplate::New(CreateSsl)->GetFunction());
constructor->Set(NanNew("createFake"),
FunctionTemplate::New(CreateFake)->GetFunction());
exports->Set(NanNew("ServerCredentials"), constructor);
}
bool ServerCredentials::HasInstance(Handle<Value> val) {
NanScope();
return NanHasInstance(fun_tpl, val);
}
Handle<Value> ServerCredentials::WrapStruct(
grpc_server_credentials *credentials) {
NanEscapableScope();
if (credentials == NULL) {
return NanEscapeScope(NanNull());
}
const int argc = 1;
Handle<Value> argv[argc] = {
External::New(reinterpret_cast<void*>(credentials)) };
return NanEscapeScope(constructor->NewInstance(argc, argv));
}
grpc_server_credentials *ServerCredentials::GetWrappedServerCredentials() {
return wrapped_credentials;
}
NAN_METHOD(ServerCredentials::New) {
NanScope();
if (args.IsConstructCall()) {
if (!args[0]->IsExternal()) {
return NanThrowTypeError(
"ServerCredentials can only be created with the provide functions");
}
grpc_server_credentials *creds_value =
reinterpret_cast<grpc_server_credentials*>(External::Unwrap(args[0]));
ServerCredentials *credentials = new ServerCredentials(creds_value);
credentials->Wrap(args.This());
NanReturnValue(args.This());
} else {
const int argc = 1;
Local<Value> argv[argc] = { args[0] };
NanReturnValue(constructor->NewInstance(argc, argv));
}
}
NAN_METHOD(ServerCredentials::CreateSsl) {
NanScope();
char *root_certs = NULL;
char *private_key;
char *cert_chain;
int root_certs_length = 0, private_key_length, cert_chain_length;
if (Buffer::HasInstance(args[0])) {
root_certs = Buffer::Data(args[0]);
root_certs_length = Buffer::Length(args[0]);
} else if (!(args[0]->IsNull() || args[0]->IsUndefined())) {
return NanThrowTypeError(
"createSSl's first argument must be a Buffer if provided");
}
if (!Buffer::HasInstance(args[1])) {
return NanThrowTypeError(
"createSsl's second argument must be a Buffer");
}
private_key = Buffer::Data(args[1]);
private_key_length = Buffer::Length(args[1]);
if (!Buffer::HasInstance(args[2])) {
return NanThrowTypeError(
"createSsl's third argument must be a Buffer");
}
cert_chain = Buffer::Data(args[2]);
cert_chain_length = Buffer::Length(args[2]);
NanReturnValue(WrapStruct(grpc_ssl_server_credentials_create(
reinterpret_cast<unsigned char*>(root_certs), root_certs_length,
reinterpret_cast<unsigned char*>(private_key), private_key_length,
reinterpret_cast<unsigned char*>(cert_chain), cert_chain_length)));
}
NAN_METHOD(ServerCredentials::CreateFake) {
NanScope();
NanReturnValue(WrapStruct(
grpc_fake_transport_security_server_credentials_create()));
}
} // namespace node
} // namespace grpc

@ -0,0 +1,44 @@
#ifndef NET_GRPC_NODE_SERVER_CREDENTIALS_H_
#define NET_GRPC_NODE_SERVER_CREDENTIALS_H_
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
namespace grpc {
namespace node {
/* Wrapper class for grpc_server_credentials structs */
class ServerCredentials : public ::node::ObjectWrap {
public:
static void Init(v8::Handle<v8::Object> exports);
static bool HasInstance(v8::Handle<v8::Value> val);
/* Wrap a grpc_server_credentials struct in a javascript object */
static v8::Handle<v8::Value> WrapStruct(grpc_server_credentials *credentials);
/* Returns the grpc_server_credentials struct that this object wraps */
grpc_server_credentials *GetWrappedServerCredentials();
private:
explicit ServerCredentials(grpc_server_credentials *credentials);
~ServerCredentials();
// Prevent copying
ServerCredentials(const ServerCredentials&);
ServerCredentials& operator=(const ServerCredentials&);
static NAN_METHOD(New);
static NAN_METHOD(CreateSsl);
static NAN_METHOD(CreateFake);
static v8::Persistent<v8::Function> constructor;
// Used for typechecking instances of this javascript class
static v8::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_server_credentials *wrapped_credentials;
};
} // namespace node
} // namespace grpc
#endif // NET_GRPC_NODE_SERVER_CREDENTIALS_H_

@ -0,0 +1,306 @@
var _ = require('underscore');
var client = require('./client.js');
var EventEmitter = require('events').EventEmitter;
var stream = require('stream');
var Readable = stream.Readable;
var Writable = stream.Writable;
var Duplex = stream.Duplex;
var util = require('util');
function forwardEvent(fromEmitter, toEmitter, event) {
fromEmitter.on(event, function forward() {
_.partial(toEmitter.emit, event).apply(toEmitter, arguments);
});
}
util.inherits(ClientReadableObjectStream, Readable);
/**
* Class for representing a gRPC server streaming call as a Node stream on the
* client side. Extends from stream.Readable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(Buffer)} deserialize Function for deserializing binary data
* @param {object} options Stream options
*/
function ClientReadableObjectStream(stream, deserialize, options) {
options = _.extend(options, {objectMode: true});
Readable.call(this, options);
this._stream = stream;
var self = this;
forwardEvent(stream, this, 'status');
forwardEvent(stream, this, 'metadata');
this._stream.on('data', function forwardData(chunk) {
if (!self.push(deserialize(chunk))) {
self._stream.pause();
}
});
this._stream.pause();
}
util.inherits(ClientWritableObjectStream, Writable);
/**
* Class for representing a gRPC client streaming call as a Node stream on the
* client side. Extends from stream.Writable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(*):Buffer} serialize Function for serializing objects
* @param {object} options Stream options
*/
function ClientWritableObjectStream(stream, serialize, options) {
options = _.extend(options, {objectMode: true});
Writable.call(this, options);
this._stream = stream;
this._serialize = serialize;
forwardEvent(stream, this, 'status');
forwardEvent(stream, this, 'metadata');
this.on('finish', function() {
this._stream.end();
});
}
util.inherits(ClientBidiObjectStream, Duplex);
/**
* Class for representing a gRPC bidi streaming call as a Node stream on the
* client side. Extends from stream.Duplex.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(*):Buffer} serialize Function for serializing objects
* @param {function(Buffer)} deserialize Function for deserializing binary data
* @param {object} options Stream options
*/
function ClientBidiObjectStream(stream, serialize, deserialize, options) {
options = _.extend(options, {objectMode: true});
Duplex.call(this, options);
this._stream = stream;
this._serialize = serialize;
var self = this;
forwardEvent(stream, this, 'status');
forwardEvent(stream, this, 'metadata');
this._stream.on('data', function forwardData(chunk) {
if (!self.push(deserialize(chunk))) {
self._stream.pause();
}
});
this._stream.pause();
this.on('finish', function() {
this._stream.end();
});
}
/**
* _read implementation for both types of streams that allow reading.
* @this {ClientReadableObjectStream|ClientBidiObjectStream}
* @param {number} size Ignored
*/
function _read(size) {
this._stream.resume();
}
/**
* See docs for _read
*/
ClientReadableObjectStream.prototype._read = _read;
/**
* See docs for _read
*/
ClientBidiObjectStream.prototype._read = _read;
/**
* _write implementation for both types of streams that allow writing
* @this {ClientWritableObjectStream|ClientBidiObjectStream}
* @param {*} chunk The value to write to the stream
* @param {string} encoding Ignored
* @param {function(Error)} callback Callback to call when finished writing
*/
function _write(chunk, encoding, callback) {
this._stream.write(this._serialize(chunk), encoding, callback);
}
/**
* See docs for _write
*/
ClientWritableObjectStream.prototype._write = _write;
/**
* See docs for _write
*/
ClientBidiObjectStream.prototype._write = _write;
/**
* Get a function that can make unary requests to the specified method.
* @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs
* @param {function(Buffer)} deserialize The deserialization function for
* outputs
* @return {Function} makeUnaryRequest
*/
function makeUnaryRequestFunction(method, serialize, deserialize) {
/**
* Make a unary request with this method on the given channel with the given
* argument, callback, etc.
* @param {client.Channel} channel The channel on which to make the request
* @param {*} argument The argument to the call. Should be serializable with
* serialize
* @param {function(?Error, value=)} callback The callback to for when the
* response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
function makeUnaryRequest(channel, argument, callback, metadata, deadline) {
var stream = client.makeRequest(channel, method, metadata, deadline);
var emitter = new EventEmitter();
forwardEvent(stream, emitter, 'status');
forwardEvent(stream, emitter, 'metadata');
stream.write(serialize(argument));
stream.end();
stream.on('data', function forwardData(chunk) {
try {
callback(null, deserialize(chunk));
} catch (e) {
callback(e);
}
});
return emitter;
}
return makeUnaryRequest;
}
/**
* Get a function that can make client stream requests to the specified method.
* @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs
* @param {function(Buffer)} deserialize The deserialization function for
* outputs
* @return {Function} makeClientStreamRequest
*/
function makeClientStreamRequestFunction(method, serialize, deserialize) {
/**
* Make a client stream request with this method on the given channel with the
* given callback, etc.
* @param {client.Channel} channel The channel on which to make the request
* @param {function(?Error, value=)} callback The callback to for when the
* response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
function makeClientStreamRequest(channel, callback, metadata, deadline) {
var stream = client.makeRequest(channel, method, metadata, deadline);
var obj_stream = new ClientWritableObjectStream(stream, serialize, {});
stream.on('data', function forwardData(chunk) {
try {
callback(null, deserialize(chunk));
} catch (e) {
callback(e);
}
});
return obj_stream;
}
return makeClientStreamRequest;
}
/**
* Get a function that can make server stream requests to the specified method.
* @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs
* @param {function(Buffer)} deserialize The deserialization function for
* outputs
* @return {Function} makeServerStreamRequest
*/
function makeServerStreamRequestFunction(method, serialize, deserialize) {
/**
* Make a server stream request with this method on the given channel with the
* given argument, etc.
* @param {client.Channel} channel The channel on which to make the request
* @param {*} argument The argument to the call. Should be serializable with
* serialize
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
function makeServerStreamRequest(channel, argument, metadata, deadline) {
var stream = client.makeRequest(channel, method, metadata, deadline);
var obj_stream = new ClientReadableObjectStream(stream, deserialize, {});
stream.write(serialize(argument));
stream.end();
return obj_stream;
}
return makeServerStreamRequest;
}
/**
* Get a function that can make bidirectional stream requests to the specified
* method.
* @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs
* @param {function(Buffer)} deserialize The deserialization function for
* outputs
* @return {Function} makeBidiStreamRequest
*/
function makeBidiStreamRequestFunction(method, serialize, deserialize) {
/**
* Make a bidirectional stream request with this method on the given channel.
* @param {client.Channel} channel The channel on which to make the request
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
function makeBidiStreamRequest(channel, metadata, deadline) {
var stream = client.makeRequest(channel, method, metadata, deadline);
var obj_stream = new ClientBidiObjectStream(stream,
serialize,
deserialize,
{});
return obj_stream;
}
return makeBidiStreamRequest;
}
/**
* See docs for makeUnaryRequestFunction
*/
exports.makeUnaryRequestFunction = makeUnaryRequestFunction;
/**
* See docs for makeClientStreamRequestFunction
*/
exports.makeClientStreamRequestFunction = makeClientStreamRequestFunction;
/**
* See docs for makeServerStreamRequestFunction
*/
exports.makeServerStreamRequestFunction = makeServerStreamRequestFunction;
/**
* See docs for makeBidiStreamRequestFunction
*/
exports.makeBidiStreamRequestFunction = makeBidiStreamRequestFunction;
/**
* See docs for client.Channel
*/
exports.Channel = client.Channel;
/**
* See docs for client.status
*/
exports.status = client.status;
/**
* See docs for client.callError
*/
exports.callError = client.callError;

@ -0,0 +1,325 @@
var _ = require('underscore');
var Server = require('./server.js');
var stream = require('stream');
var Readable = stream.Readable;
var Writable = stream.Writable;
var Duplex = stream.Duplex;
var util = require('util');
util.inherits(ServerReadableObjectStream, Readable);
/**
* Class for representing a gRPC client streaming call as a Node stream on the
* server side. Extends from stream.Readable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(Buffer)} deserialize Function for deserializing binary data
* @param {object} options Stream options
*/
function ServerReadableObjectStream(stream, deserialize, options) {
options = _.extend(options, {objectMode: true});
Readable.call(this, options);
this._stream = stream;
Object.defineProperty(this, 'cancelled', {
get: function() { return stream.cancelled; }
});
var self = this;
this._stream.on('data', function forwardData(chunk) {
if (!self.push(deserialize(chunk))) {
self._stream.pause();
}
});
this._stream.on('end', function forwardEnd() {
self.push(null);
});
this._stream.pause();
}
util.inherits(ServerWritableObjectStream, Writable);
/**
* Class for representing a gRPC server streaming call as a Node stream on the
* server side. Extends from stream.Writable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(*):Buffer} serialize Function for serializing objects
* @param {object} options Stream options
*/
function ServerWritableObjectStream(stream, serialize, options) {
options = _.extend(options, {objectMode: true});
Writable.call(this, options);
this._stream = stream;
this._serialize = serialize;
this.on('finish', function() {
this._stream.end();
});
}
util.inherits(ServerBidiObjectStream, Duplex);
/**
* Class for representing a gRPC bidi streaming call as a Node stream on the
* server side. Extends from stream.Duplex.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
* @param {function(*):Buffer} serialize Function for serializing objects
* @param {function(Buffer)} deserialize Function for deserializing binary data
* @param {object} options Stream options
*/
function ServerBidiObjectStream(stream, serialize, deserialize, options) {
options = _.extend(options, {objectMode: true});
Duplex.call(this, options);
this._stream = stream;
this._serialize = serialize;
var self = this;
this._stream.on('data', function forwardData(chunk) {
if (!self.push(deserialize(chunk))) {
self._stream.pause();
}
});
this._stream.on('end', function forwardEnd() {
self.push(null);
});
this._stream.pause();
this.on('finish', function() {
this._stream.end();
});
}
/**
* _read implementation for both types of streams that allow reading.
* @this {ServerReadableObjectStream|ServerBidiObjectStream}
* @param {number} size Ignored
*/
function _read(size) {
this._stream.resume();
}
/**
* See docs for _read
*/
ServerReadableObjectStream.prototype._read = _read;
/**
* See docs for _read
*/
ServerBidiObjectStream.prototype._read = _read;
/**
* _write implementation for both types of streams that allow writing
* @this {ServerWritableObjectStream|ServerBidiObjectStream}
* @param {*} chunk The value to write to the stream
* @param {string} encoding Ignored
* @param {function(Error)} callback Callback to call when finished writing
*/
function _write(chunk, encoding, callback) {
this._stream.write(this._serialize(chunk), encoding, callback);
}
/**
* See docs for _write
*/
ServerWritableObjectStream.prototype._write = _write;
/**
* See docs for _write
*/
ServerBidiObjectStream.prototype._write = _write;
/**
* Creates a binary stream handler function from a unary handler function
* @param {function(Object, function(Error, *))} handler Unary call handler
* @param {function(*):Buffer} serialize Serialization function
* @param {function(Buffer):*} deserialize Deserialization function
* @return {function(stream)} Binary stream handler
*/
function makeUnaryHandler(handler, serialize, deserialize) {
/**
* Handles a stream by reading a single data value, passing it to the handler,
* and writing the response back to the stream.
* @param {stream} stream Binary data stream
*/
return function handleUnaryCall(stream) {
stream.on('data', function handleUnaryData(value) {
var call = {request: deserialize(value)};
Object.defineProperty(call, 'cancelled', {
get: function() { return stream.cancelled;}
});
handler(call, function sendUnaryData(err, value) {
if (err) {
stream.emit('error', err);
} else {
stream.write(serialize(value));
stream.end();
}
});
});
};
}
/**
* Creates a binary stream handler function from a client stream handler
* function
* @param {function(Readable, function(Error, *))} handler Client stream call
* handler
* @param {function(*):Buffer} serialize Serialization function
* @param {function(Buffer):*} deserialize Deserialization function
* @return {function(stream)} Binary stream handler
*/
function makeClientStreamHandler(handler, serialize, deserialize) {
/**
* Handles a stream by passing a deserializing stream to the handler and
* writing the response back to the stream.
* @param {stream} stream Binary data stream
*/
return function handleClientStreamCall(stream) {
var object_stream = new ServerReadableObjectStream(stream, deserialize, {});
handler(object_stream, function sendClientStreamData(err, value) {
if (err) {
stream.emit('error', err);
} else {
stream.write(serialize(value));
stream.end();
}
});
};
}
/**
* Creates a binary stream handler function from a server stream handler
* function
* @param {function(Writable)} handler Server stream call handler
* @param {function(*):Buffer} serialize Serialization function
* @param {function(Buffer):*} deserialize Deserialization function
* @return {function(stream)} Binary stream handler
*/
function makeServerStreamHandler(handler, serialize, deserialize) {
/**
* Handles a stream by attaching it to a serializing stream, and passing it to
* the handler.
* @param {stream} stream Binary data stream
*/
return function handleServerStreamCall(stream) {
stream.on('data', function handleClientData(value) {
var object_stream = new ServerWritableObjectStream(stream,
serialize,
{});
object_stream.request = deserialize(value);
handler(object_stream);
});
};
}
/**
* Creates a binary stream handler function from a bidi stream handler function
* @param {function(Duplex)} handler Unary call handler
* @param {function(*):Buffer} serialize Serialization function
* @param {function(Buffer):*} deserialize Deserialization function
* @return {function(stream)} Binary stream handler
*/
function makeBidiStreamHandler(handler, serialize, deserialize) {
/**
* Handles a stream by wrapping it in a serializing and deserializing object
* stream, and passing it to the handler.
* @param {stream} stream Binary data stream
*/
return function handleBidiStreamCall(stream) {
var object_stream = new ServerBidiObjectStream(stream,
serialize,
deserialize,
{});
handler(object_stream);
};
}
/**
* Map with short names for each of the handler maker functions. Used in
* makeServerConstructor
*/
var handler_makers = {
unary: makeUnaryHandler,
server_stream: makeServerStreamHandler,
client_stream: makeClientStreamHandler,
bidi: makeBidiStreamHandler
};
/**
* Creates a constructor for servers with a service defined by the methods
* object. The methods object has string keys and values of this form:
* {serialize: function, deserialize: function, client_stream: bool,
* server_stream: bool}
* @param {Object} methods Method descriptor for each method the server should
* expose
* @param {string} prefix The prefex to prepend to each method name
* @return {function(Object, Object)} New server constructor
*/
function makeServerConstructor(methods, prefix) {
/**
* Create a server with the given handlers for all of the methods.
* @constructor
* @param {Object} handlers Map from method names to method handlers.
* @param {Object} options Options to pass to the underlying server
*/
function SurfaceServer(handlers, options) {
var server = new Server(options);
this.inner_server = server;
_.each(handlers, function(handler, name) {
var method = methods[name];
var method_type;
if (method.client_stream) {
if (method.server_stream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
if (method.server_stream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
var binary_handler = handler_makers[method_type](handler,
method.serialize,
method.deserialize);
server.register('' + prefix + name, binary_handler);
}, this);
}
/**
* Binds the server to the given port, with SSL enabled if secure is specified
* @param {string} port The port that the server should bind on, in the format
* "address:port"
* @param {boolean=} secure Whether the server should open a secure port
* @return {SurfaceServer} this
*/
SurfaceServer.prototype.bind = function(port, secure) {
this.inner_server.bind(port, secure);
return this;
};
/**
* Starts the server listening on any bound ports
* @return {SurfaceServer} this
*/
SurfaceServer.prototype.listen = function() {
this.inner_server.start();
return this;
};
/**
* Shuts the server down; tells it to stop listening for new requests and to
* kill old requests.
*/
SurfaceServer.prototype.shutdown = function() {
this.inner_server.shutdown();
};
return SurfaceServer;
}
/**
* See documentation for makeServerConstructor
*/
exports.makeServerConstructor = makeServerConstructor;

@ -0,0 +1,71 @@
#include <stdlib.h>
#include <node.h>
#include <nan.h>
#include "tag.h"
namespace grpc {
namespace node {
using v8::Handle;
using v8::HandleScope;
using v8::Persistent;
using v8::Value;
struct tag {
tag(Persistent<Value> *tag, Persistent<Value> *call)
: persist_tag(tag), persist_call(call) {
}
~tag() {
persist_tag->Dispose();
if (persist_call != NULL) {
persist_call->Dispose();
}
}
Persistent<Value> *persist_tag;
Persistent<Value> *persist_call;
};
void *CreateTag(Handle<Value> tag, Handle<Value> call) {
NanScope();
Persistent<Value> *persist_tag = new Persistent<Value>();
NanAssignPersistent(*persist_tag, tag);
Persistent<Value> *persist_call;
if (call->IsNull() || call->IsUndefined()) {
persist_call = NULL;
} else {
persist_call = new Persistent<Value>();
NanAssignPersistent(*persist_call, call);
}
struct tag *tag_struct = new struct tag(persist_tag, persist_call);
return reinterpret_cast<void*>(tag_struct);
}
Handle<Value> GetTagHandle(void *tag) {
NanEscapableScope();
struct tag *tag_struct = reinterpret_cast<struct tag*>(tag);
Handle<Value> tag_value = NanNew<Value>(*tag_struct->persist_tag);
return NanEscapeScope(tag_value);
}
bool TagHasCall(void *tag) {
struct tag *tag_struct = reinterpret_cast<struct tag*>(tag);
return tag_struct->persist_call != NULL;
}
Handle<Value> TagGetCall(void *tag) {
NanEscapableScope();
struct tag *tag_struct = reinterpret_cast<struct tag*>(tag);
if (tag_struct->persist_call == NULL) {
return NanEscapeScope(NanNull());
}
Handle<Value> call_value = NanNew<Value>(*tag_struct->persist_call);
return NanEscapeScope(call_value);
}
void DestroyTag(void *tag) {
delete reinterpret_cast<struct tag*>(tag);
}
} // namespace node
} // namespace grpc

@ -0,0 +1,26 @@
#ifndef NET_GRPC_NODE_TAG_H_
#define NET_GRPC_NODE_TAG_H_
#include <node.h>
namespace grpc {
namespace node {
/* Create a void* tag that can be passed to various grpc_call functions from
a javascript value and the javascript wrapper for the call. The call can be
null. */
void *CreateTag(v8::Handle<v8::Value> tag, v8::Handle<v8::Value> call);
/* Return the javascript value stored in the tag */
v8::Handle<v8::Value> GetTagHandle(void *tag);
/* Returns true if the call was set (non-null) when the tag was created */
bool TagHasCall(void *tag);
/* Returns the javascript wrapper for the call associated with this tag */
v8::Handle<v8::Value> TagGetCall(void *call);
/* Destroy the tag and all resources it is holding. It is illegal to call any
of these other functions on a tag after it has been destroyed. */
void DestroyTag(void *tag);
} // namespace node
} // namespace grpc
#endif // NET_GRPC_NODE_TAG_H_

@ -0,0 +1,35 @@
var assert = require('assert');
var grpc = require('..build/Release/grpc');
describe('byte buffer', function() {
describe('constructor', function() {
it('should reject bad constructor calls', function() {
it('should require at least one argument', function() {
assert.throws(new grpc.ByteBuffer(), TypeError);
});
it('should reject non-string arguments', function() {
assert.throws(new grpc.ByteBuffer(0), TypeError);
assert.throws(new grpc.ByteBuffer(1.5), TypeError);
assert.throws(new grpc.ByteBuffer(null), TypeError);
assert.throws(new grpc.ByteBuffer(Date.now()), TypeError);
});
it('should accept string arguments', function() {
assert.doesNotThrow(new grpc.ByteBuffer(''));
assert.doesNotThrow(new grpc.ByteBuffer('test'));
assert.doesNotThrow(new grpc.ByteBuffer('\0'));
});
});
});
describe('bytes', function() {
it('should return the passed string', function() {
it('should preserve simple strings', function() {
var buffer = new grpc.ByteBuffer('test');
assert.strictEqual(buffer.bytes(), 'test');
});
it('should preserve null characters', function() {
var buffer = new grpc.ByteBuffer('test\0test');
assert.strictEqual(buffer.bytes(), 'test\0test');
});
});
});
});

@ -0,0 +1,169 @@
var assert = require('assert');
var grpc = require('bindings')('grpc.node');
var channel = new grpc.Channel('localhost:7070');
/**
* Helper function to return an absolute deadline given a relative timeout in
* seconds.
* @param {number} timeout_secs The number of seconds to wait before timing out
* @return {Date} A date timeout_secs in the future
*/
function getDeadline(timeout_secs) {
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + timeout_secs);
return deadline;
}
describe('call', function() {
describe('constructor', function() {
it('should reject anything less than 3 arguments', function() {
assert.throws(function() {
new grpc.Call();
}, TypeError);
assert.throws(function() {
new grpc.Call(channel);
}, TypeError);
assert.throws(function() {
new grpc.Call(channel, 'method');
}, TypeError);
});
it('should succeed with a Channel, a string, and a date or number',
function() {
assert.doesNotThrow(function() {
new grpc.Call(channel, 'method', new Date());
});
assert.doesNotThrow(function() {
new grpc.Call(channel, 'method', 0);
});
});
it('should fail with a closed channel', function() {
var local_channel = new grpc.Channel('hostname');
local_channel.close();
assert.throws(function() {
new grpc.Call(channel, 'method');
});
});
it('should fail with other types', function() {
assert.throws(function() {
new grpc.Call({}, 'method', 0);
}, TypeError);
assert.throws(function() {
new grpc.Call(channel, null, 0);
}, TypeError);
assert.throws(function() {
new grpc.Call(channel, 'method', 'now');
}, TypeError);
});
});
describe('addMetadata', function() {
it('should succeed with objects containing keys and values', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.doesNotThrow(function() {
call.addMetadata();
});
assert.doesNotThrow(function() {
call.addMetadata({'key' : 'key',
'value' : new Buffer('value')});
});
assert.doesNotThrow(function() {
call.addMetadata({'key' : 'key1',
'value' : new Buffer('value1')},
{'key' : 'key2',
'value' : new Buffer('value2')});
});
});
it('should fail with other parameter types', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
call.addMetadata(null);
}, TypeError);
assert.throws(function() {
call.addMetadata('value');
}, TypeError);
assert.throws(function() {
call.addMetadata(5);
}, TypeError);
});
it('should fail if startInvoke was already called', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
call.startInvoke(function() {},
function() {},
function() {done();},
0);
assert.throws(function() {
call.addMetadata({'key' : 'key', 'value' : new Buffer('value') });
}, function(err) {
return err.code === grpc.callError.ALREADY_INVOKED;
});
// Cancel to speed up the test
call.cancel();
});
});
describe('startInvoke', function() {
it('should fail with fewer than 4 arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
call.startInvoke();
}, TypeError);
assert.throws(function() {
call.startInvoke(function() {});
}, TypeError);
assert.throws(function() {
call.startInvoke(function() {},
function() {});
}, TypeError);
assert.throws(function() {
call.startInvoke(function() {},
function() {},
function() {});
}, TypeError);
});
it('should work with 3 args and an int', function(done) {
assert.doesNotThrow(function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
call.startInvoke(function() {},
function() {},
function() {done();},
0);
// Cancel to speed up the test
call.cancel();
});
});
it('should reject incorrectly typed arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
call.startInvoke(0, 0, 0, 0);
}, TypeError);
assert.throws(function() {
call.startInvoke(function() {},
function() {},
function() {}, 'test');
});
});
});
describe('serverAccept', function() {
it('should fail with fewer than 1 argument1', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
call.serverAccept();
}, TypeError);
});
it('should return an error when called on a client Call', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
call.serverAccept(function() {});
}, function(err) {
return err.code === grpc.callError.NOT_ON_CLIENT;
});
});
});
describe('cancel', function() {
it('should succeed', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.doesNotThrow(function() {
call.cancel();
});
});
});
});

@ -0,0 +1,6 @@
var assert = require('assert');
var grpc = require('../build/Release/grpc');
describe('call', function() {
describe('constructor', function() {
it('should reject anything less than 4 arguments', function() {

@ -0,0 +1,55 @@
var assert = require('assert');
var grpc = require('bindings')('grpc.node');
describe('channel', function() {
describe('constructor', function() {
it('should require a string for the first argument', function() {
assert.doesNotThrow(function() {
new grpc.Channel('hostname');
});
assert.throws(function() {
new grpc.Channel();
}, TypeError);
assert.throws(function() {
new grpc.Channel(5);
});
});
it('should accept an object for the second parameter', function() {
assert.doesNotThrow(function() {
new grpc.Channel('hostname', {});
});
assert.throws(function() {
new grpc.Channel('hostname', 5);
});
});
it('should only accept objects with string or int values', function() {
assert.doesNotThrow(function() {
new grpc.Channel('hostname', {'key' : 'value'});
});
assert.doesNotThrow(function() {
new grpc.Channel('hostname', {'key' : 5});
});
assert.throws(function() {
new grpc.Channel('hostname', {'key' : null});
});
assert.throws(function() {
new grpc.Channel('hostname', {'key' : new Date()});
});
});
});
describe('close', function() {
it('should succeed silently', function() {
var channel = new grpc.Channel('hostname', {});
assert.doesNotThrow(function() {
channel.close();
});
});
it('should be idempotent', function() {
var channel = new grpc.Channel('hostname', {});
assert.doesNotThrow(function() {
channel.close();
channel.close();
});
});
});
});

@ -0,0 +1,150 @@
var assert = require('assert');
var fs = require('fs');
var path = require('path');
var grpc = require('bindings')('grpc.node');
var Server = require('../server');
var client = require('../client');
var port_picker = require('../port_picker');
var common = require('../common');
var _ = require('highland');
var ca_path = path.join(__dirname, 'data/ca.pem');
var key_path = path.join(__dirname, 'data/server1.key');
var pem_path = path.join(__dirname, 'data/server1.pem');
/**
* Helper function to return an absolute deadline given a relative timeout in
* seconds.
* @param {number} timeout_secs The number of seconds to wait before timing out
* @return {Date} A date timeout_secs in the future
*/
function getDeadline(timeout_secs) {
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + timeout_secs);
return deadline;
}
/**
* Responds to every request with the same data as a response
* @param {Stream} stream
*/
function echoHandler(stream) {
stream.pipe(stream);
}
/**
* Responds to every request with an error status
* @param {Stream} stream
*/
function errorHandler(stream) {
throw {
'code' : grpc.status.UNIMPLEMENTED,
'details' : 'error details'
};
}
describe('echo client', function() {
it('should receive echo responses', function(done) {
port_picker.nextAvailablePort(function(port) {
var server = new Server();
server.bind(port);
server.register('echo', echoHandler);
server.start();
var messages = ['echo1', 'echo2', 'echo3', 'echo4'];
var channel = new grpc.Channel(port);
var stream = client.makeRequest(
channel,
'echo');
_(messages).map(function(val) {
return new Buffer(val);
}).pipe(stream);
var index = 0;
stream.on('data', function(chunk) {
assert.equal(messages[index], chunk.toString());
index += 1;
});
stream.on('end', function() {
server.shutdown();
done();
});
});
});
it('should get an error status that the server throws', function(done) {
port_picker.nextAvailablePort(function(port) {
var server = new Server();
server.bind(port);
server.register('error', errorHandler);
server.start();
var channel = new grpc.Channel(port);
var stream = client.makeRequest(
channel,
'error',
null,
getDeadline(1));
stream.on('data', function() {});
stream.write(new Buffer('test'));
stream.end();
stream.on('status', function(status) {
assert.equal(status.code, grpc.status.UNIMPLEMENTED);
assert.equal(status.details, 'error details');
server.shutdown();
done();
});
});
});
});
/* TODO(mlumish): explore options for reducing duplication between this test
* and the insecure echo client test */
describe('secure echo client', function() {
it('should recieve echo responses', function(done) {
port_picker.nextAvailablePort(function(port) {
fs.readFile(ca_path, function(err, ca_data) {
assert.ifError(err);
fs.readFile(key_path, function(err, key_data) {
assert.ifError(err);
fs.readFile(pem_path, function(err, pem_data) {
assert.ifError(err);
var creds = grpc.Credentials.createSsl(ca_data);
var server_creds = grpc.ServerCredentials.createSsl(null,
key_data,
pem_data);
var server = new Server({'credentials' : server_creds});
server.bind(port, true);
server.register('echo', echoHandler);
server.start();
var messages = ['echo1', 'echo2', 'echo3', 'echo4'];
var channel = new grpc.Channel(port, {
'grpc.ssl_target_name_override' : 'foo.test.google.com',
'credentials' : creds
});
var stream = client.makeRequest(
channel,
'echo');
_(messages).map(function(val) {
return new Buffer(val);
}).pipe(stream);
var index = 0;
stream.on('data', function(chunk) {
assert.equal(messages[index], chunk.toString());
index += 1;
});
stream.on('end', function() {
server.shutdown();
done();
});
});
});
});
});
});
});

@ -0,0 +1,59 @@
var assert = require('assert');
var grpc = require('../build/Debug/grpc');
var Server = require('../server');
var client = require('../client');
var port_picker = require('../port_picker');
var iterators = require('async-iterators');
/**
* General function to process an event by checking that there was no error and
* calling the callback passed as a tag.
* @param {*} err Truthy values indicate an error (in this case, that there was
* no event available).
* @param {grpc.Event} event The event to process.
*/
function processEvent(err, event) {
assert.ifError(err);
assert.notEqual(event, null);
event.getTag()(event);
}
/**
* Responds to every request with the same data as a response
* @param {{next:function(function(*, Buffer))}} arg_iter The async iterator of
* arguments.
* @return {{next:function(function(*, Buffer))}} The async iterator of results
*/
function echoHandler(arg_iter) {
return {
'next' : function(write) {
arg_iter.next(function(err, value) {
if (value == undefined) {
write({
'code' : grpc.status.OK,
'details' : 'OK'
});
} else {
write(err, value);
}
});
}
};
}
describe('echo client server', function() {
it('should recieve echo responses', function(done) {
port_picker.nextAvailablePort(function(port) {
var server = new Server(port);
server.register('echo', echoHandler);
server.start();
var messages = ['echo1', 'echo2', 'echo3'];
var channel = new grpc.Channel(port);
var responses = client.makeRequest(channel,
'echo',
iterators.fromArray(messages));
assert.equal(messages, iterators.toArray(responses));
});
});
});

@ -0,0 +1,30 @@
var assert = require('assert');
var grpc = require('../build/Release/grpc');
describe('completion queue', function() {
describe('constructor', function() {
it('should succeed with now arguments', function() {
assert.doesNotThrow(function() {
new grpc.CompletionQueue();
});
});
});
describe('next', function() {
it('should require a date parameter', function() {
var queue = new grpc.CompletionQueue();
assert.throws(function() {
queue->next();
}, TypeError);
assert.throws(function() {
queue->next('test');
}, TypeError);
assert.doesNotThrow(function() {
queue->next(Date.now());
});
});
it('should return null from a new queue', function() {
var queue = new grpc.CompletionQueue();
assert.strictEqual(queue->next(Date.now()), null);
});
});
});

@ -0,0 +1,97 @@
var assert = require('assert');
var grpc = require('bindings')('grpc.node');
/**
* List of all status names
* @const
* @type {Array.<string>}
*/
var statusNames = [
'OK',
'CANCELLED',
'UNKNOWN',
'INVALID_ARGUMENT',
'DEADLINE_EXCEEDED',
'NOT_FOUND',
'ALREADY_EXISTS',
'PERMISSION_DENIED',
'UNAUTHENTICATED',
'RESOURCE_EXHAUSTED',
'FAILED_PRECONDITION',
'ABORTED',
'OUT_OF_RANGE',
'UNIMPLEMENTED',
'INTERNAL',
'UNAVAILABLE',
'DATA_LOSS'
];
/**
* List of all call error names
* @const
* @type {Array.<string>}
*/
var callErrorNames = [
'OK',
'ERROR',
'NOT_ON_SERVER',
'NOT_ON_CLIENT',
'ALREADY_INVOKED',
'NOT_INVOKED',
'ALREADY_FINISHED',
'TOO_MANY_OPERATIONS',
'INVALID_FLAGS'
];
/**
* List of all op error names
* @const
* @type {Array.<string>}
*/
var opErrorNames = [
'OK',
'ERROR'
];
/**
* List of all completion type names
* @const
* @type {Array.<string>}
*/
var completionTypeNames = [
'QUEUE_SHUTDOWN',
'READ',
'INVOKE_ACCEPTED',
'WRITE_ACCEPTED',
'FINISH_ACCEPTED',
'CLIENT_METADATA_READ',
'FINISHED',
'SERVER_RPC_NEW'
];
describe('constants', function() {
it('should have all of the status constants', function() {
for (var i = 0; i < statusNames.length; i++) {
assert(grpc.status.hasOwnProperty(statusNames[i]),
'status missing: ' + statusNames[i]);
}
});
it('should have all of the call errors', function() {
for (var i = 0; i < callErrorNames.length; i++) {
assert(grpc.callError.hasOwnProperty(callErrorNames[i]),
'call error missing: ' + callErrorNames[i]);
}
});
it('should have all of the op errors', function() {
for (var i = 0; i < opErrorNames.length; i++) {
assert(grpc.opError.hasOwnProperty(opErrorNames[i]),
'op error missing: ' + opErrorNames[i]);
}
});
it('should have all of the completion types', function() {
for (var i = 0; i < completionTypeNames.length; i++) {
assert(grpc.completionType.hasOwnProperty(completionTypeNames[i]),
'completion type missing: ' + completionTypeNames[i]);
}
});
});

@ -0,0 +1,25 @@
var assert = require("assert");
var grpc = require("../build/Release");
var status_names = [
"OK",
"CANCELLED",
"UNKNOWN",
"INVALID_ARGUMENT",
"DEADLINE_EXCEEDED",
"NOT_FOUND",
"ALREADY_EXISTS",
"PERMISSION_DENIED",
"UNAUTHENTICATED",
"RESOURCE_EXHAUSTED",
"FAILED_PRECONDITION",
"ABORTED",
"OUT_OF_RANGE",
"UNIMPLEMENTED",
"INTERNAL",
"UNAVAILABLE",
"DATA_LOSS"
];
describe("constants", function() {
it("should have all of the status constants", function() {

@ -0,0 +1 @@
CONFIRMEDTESTKEY

@ -0,0 +1,15 @@
-----BEGIN CERTIFICATE-----
MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla
Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0
YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT
BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7
+L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu
g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd
Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV
HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau
sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m
oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG
Dfcog5wrJytaQ6UA0wE=
-----END CERTIFICATE-----

@ -0,0 +1,16 @@
-----BEGIN PRIVATE KEY-----
MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD
M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf
3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY
AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm
V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY
tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p
dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q
K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR
81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff
DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd
aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2
ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3
XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe
F98XJ7tIFfJq
-----END PRIVATE KEY-----

@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIICmzCCAgSgAwIBAgIBAzANBgkqhkiG9w0BAQUFADBWMQswCQYDVQQGEwJBVTET
MBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQ
dHkgTHRkMQ8wDQYDVQQDDAZ0ZXN0Y2EwHhcNMTQwNzIyMDYwMDU3WhcNMjQwNzE5
MDYwMDU3WjBkMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV
BAcTB0NoaWNhZ28xFDASBgNVBAoTC0dvb2dsZSBJbmMuMRowGAYDVQQDFBEqLnRl
c3QuZ29vZ2xlLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA4cMVJygs
JUmlgMMzgdi0h1XoCR7+ww1pop04OMMyy7H/i0PJ2W6Y35+b4CM8QrkYeEafUGDO
RYX6yV/cHGGsD/x02ye6ey1UDtkGAD/mpDEx8YCrjAc1Vfvt8Fk6Cn1WVIxV/J30
3xjBsFgByQ55RBp1OLZfVLo6AleBDSbcxaECAwEAAaNrMGkwCQYDVR0TBAIwADAL
BgNVHQ8EBAMCBeAwTwYDVR0RBEgwRoIQKi50ZXN0Lmdvb2dsZS5mcoIYd2F0ZXJ6
b29pLnRlc3QuZ29vZ2xlLmJlghIqLnRlc3QueW91dHViZS5jb22HBMCoAQMwDQYJ
KoZIhvcNAQEFBQADgYEAM2Ii0LgTGbJ1j4oqX9bxVcxm+/R5Yf8oi0aZqTJlnLYS
wXcBykxTx181s7WyfJ49WwrYXo78zTDAnf1ma0fPq3e4mpspvyndLh1a+OarHa1e
aT0DIIYk7qeEa1YcVljx2KyLd0r1BBAfrwyGaEPVeJQVYWaOJRU2we/KD4ojf9s=
-----END CERTIFICATE-----

@ -0,0 +1,168 @@
var assert = require('assert');
var grpc = require('bindings')('grpc.node');
var port_picker = require('../port_picker');
/**
* This is used for testing functions with multiple asynchronous calls that
* can happen in different orders. This should be passed the number of async
* function invocations that can occur last, and each of those should call this
* function's return value
* @param {function()} done The function that should be called when a test is
* complete.
* @param {number} count The number of calls to the resulting function if the
* test passes.
* @return {function()} The function that should be called at the end of each
* sequence of asynchronous functions.
*/
function multiDone(done, count) {
return function() {
count -= 1;
if (count <= 0) {
done();
}
};
}
describe('end-to-end', function() {
it('should start and end a request without error', function(complete) {
port_picker.nextAvailablePort(function(port) {
var server = new grpc.Server();
var done = multiDone(function() {
complete();
server.shutdown();
}, 2);
server.addHttp2Port(port);
var channel = new grpc.Channel(port);
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var status_text = 'xyz';
var call = new grpc.Call(channel,
'dummy_method',
deadline);
call.startInvoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.INVOKE_ACCEPTED);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
});
},function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
},function(event) {
assert.strictEqual(event.type, grpc.completionType.FINISHED);
var status = event.data;
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(status.details, status_text);
done();
}, 0);
server.start();
server.requestCall(function(event) {
assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
var server_call = event.call;
assert.notEqual(server_call, null);
server_call.serverAccept(function(event) {
assert.strictEqual(event.type, grpc.completionType.FINISHED);
}, 0);
server_call.serverEndInitialMetadata(0);
server_call.startWriteStatus(
grpc.status.OK,
status_text,
function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
done();
});
});
});
});
it('should send and receive data without error', function(complete) {
port_picker.nextAvailablePort(function(port) {
var req_text = 'client_request';
var reply_text = 'server_response';
var server = new grpc.Server();
var done = multiDone(function() {
complete();
server.shutdown();
}, 6);
server.addHttp2Port(port);
var channel = new grpc.Channel(port);
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var status_text = 'success';
var call = new grpc.Call(channel,
'dummy_method',
deadline);
call.startInvoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.INVOKE_ACCEPTED);
call.startWrite(
new Buffer(req_text),
function(event) {
assert.strictEqual(event.type,
grpc.completionType.WRITE_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
done();
});
}, 0);
call.startRead(function(event) {
assert.strictEqual(event.type, grpc.completionType.READ);
assert.strictEqual(event.data.toString(), reply_text);
done();
});
},function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
},function(event) {
assert.strictEqual(event.type, grpc.completionType.FINISHED);
var status = event.data;
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(status.details, status_text);
done();
}, 0);
server.start();
server.requestCall(function(event) {
assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
var server_call = event.call;
assert.notEqual(server_call, null);
server_call.serverAccept(function(event) {
assert.strictEqual(event.type, grpc.completionType.FINISHED);
done();
});
server_call.serverEndInitialMetadata(0);
server_call.startRead(function(event) {
assert.strictEqual(event.type, grpc.completionType.READ);
assert.strictEqual(event.data.toString(), req_text);
server_call.startWrite(
new Buffer(reply_text),
function(event) {
assert.strictEqual(event.type,
grpc.completionType.WRITE_ACCEPTED);
assert.strictEqual(event.data,
grpc.opError.OK);
server_call.startWriteStatus(
grpc.status.OK,
status_text,
function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
done();
});
}, 0);
});
});
});
});
});

@ -0,0 +1,72 @@
var assert = require('assert');
var grpc = require('../build/Release/grpc');
describe('end-to-end', function() {
it('should start and end a request without error', function() {
var event;
var client_queue = new grpc.CompletionQueue();
var server_queue = new grpc.CompletionQueue();
var server = new grpc.Server(server_queue);
server.addHttp2Port('localhost:9000');
var channel = new grpc.Channel('localhost:9000');
var deadline = Infinity;
var status_text = 'xyz';
var call = new grpc.Call(channel, 'dummy_method', deadline);
var tag = 1;
assert.strictEqual(call.startInvoke(client_queue, tag, tag, tag),
grpc.callError.OK);
var server_tag = 2;
// the client invocation was accepted
event = client_queue.next(deadline);
assert.notEqual(event, null);
assert.strictEqual(event->getType(), grpc.completionType.INVOKE_ACCEPTED);
assert.strictEqual(call.writesDone(tag), grpc.callError.CALL_OK);
event = client_queue.next(deadline);
assert.notEqual(event, null);
assert.strictEqual(event.getType(), grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.getData(), grpc.opError.OK);
// check that a server rpc new was recieved
assert(server.start());
assert.strictEqual(server.requestCall(server_tag, server_tag),
grpc.callError.OK);
event = server_queue.next(deadline);
assert.notEqual(event, null);
assert.strictEqual(event.getType(), grpc.completionType.SERVER_RPC_NEW);
var server_call = event.getCall();
assert.notEqual(server_call, null);
assert.strictEqual(server_call.accept(server_queue, server_tag),
grpc.callError.OK);
// the server sends the status
assert.strictEqual(server_call.start_write_status(grpc.status.OK,
status_text,
server_tag),
grpc.callError.OK);
event = server_queue.next(deadline);
assert.notEqual(event, null);
assert.strictEqual(event.getType(), grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.getData(), grpc.opError.OK);
// the client gets CLIENT_METADATA_READ
event = client_queue.next(deadline);
assert.notEqual(event, null);
assert.strictEqual(event.getType(),
grpc.completionType.CLIENT_METADATA_READ);
// the client gets FINISHED
event = client_queue.next(deadline);
assert.notEqual(event, null);
assert.strictEqual(event.getType(), grpc.completionType.FINISHED);
var status = event.getData();
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(status.details, status_text);
// the server gets FINISHED
event = client_queue.next(deadline);
assert.notEqual(event, null);
assert.strictEqual(event.getType(), grpc.completionType.FINISHED);
});
});

@ -0,0 +1,176 @@
var assert = require('assert');
var client = require('../surface_client.js');
var ProtoBuf = require('protobufjs');
var port_picker = require('../port_picker');
var builder = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto');
var math = builder.build('math');
/**
* Get a function that deserializes a specific type of protobuf.
* @param {function()} cls The constructor of the message type to deserialize
* @return {function(Buffer):cls} The deserialization function
*/
function deserializeCls(cls) {
/**
* Deserialize a buffer to a message object
* @param {Buffer} arg_buf The buffer to deserialize
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
return cls.decode(arg_buf);
};
}
/**
* Serialize an object to a buffer
* @param {*} arg The object to serialize
* @return {Buffer} The serialized object
*/
function serialize(arg) {
return new Buffer(arg.encode().toBuffer());
}
/**
* Sends a Div request on the channel.
* @param {client.Channel} channel The channel on which to make the request
* @param {DivArg} argument The argument to the call. Should be serializable
* with serialize
* @param {function(?Error, value=)} The callback to for when the response is
* received
* @param {array=} Array of metadata key/value pairs to add to the call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
var div = client.makeUnaryRequestFunction(
'/Math/Div',
serialize,
deserializeCls(math.DivReply));
/**
* Sends a Fib request on the channel.
* @param {client.Channel} channel The channel on which to make the request
* @param {*} argument The argument to the call. Should be serializable with
* serialize
* @param {array=} Array of metadata key/value pairs to add to the call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
var fib = client.makeServerStreamRequestFunction(
'/Math/Fib',
serialize,
deserializeCls(math.Num));
/**
* Sends a Sum request on the channel.
* @param {client.Channel} channel The channel on which to make the request
* @param {function(?Error, value=)} The callback to for when the response is
* received
* @param {array=} Array of metadata key/value pairs to add to the call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
var sum = client.makeClientStreamRequestFunction(
'/Math/Sum',
serialize,
deserializeCls(math.Num));
/**
* Sends a DivMany request on the channel.
* @param {client.Channel} channel The channel on which to make the request
* @param {array=} Array of metadata key/value pairs to add to the call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
var divMany = client.makeBidiStreamRequestFunction(
'/Math/DivMany',
serialize,
deserializeCls(math.DivReply));
/**
* Channel to use to make requests to a running server.
*/
var channel;
/**
* Server to test against
*/
var server = require('../examples/math_server.js');
describe('Math client', function() {
before(function(done) {
port_picker.nextAvailablePort(function(port) {
server.bind(port).listen();
channel = new client.Channel(port);
done();
});
});
after(function() {
server.shutdown();
});
it('should handle a single request', function(done) {
var arg = new math.DivArgs({dividend: 7, divisor: 4});
var call = div(channel, arg, function handleDivResult(err, value) {
assert.ifError(err);
assert.equal(value.get('quotient'), 1);
assert.equal(value.get('remainder'), 3);
});
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
done();
});
});
it('should handle a server streaming request', function(done) {
var arg = new math.FibArgs({limit: 7});
var call = fib(channel, arg);
var expected_results = [1, 1, 2, 3, 5, 8, 13];
var next_expected = 0;
call.on('data', function checkResponse(value) {
assert.equal(value.get('num'), expected_results[next_expected]);
next_expected += 1;
});
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
done();
});
});
it('should handle a client streaming request', function(done) {
var call = sum(channel, function handleSumResult(err, value) {
assert.ifError(err);
assert.equal(value.get('num'), 21);
});
for (var i = 0; i < 7; i++) {
call.write(new math.Num({'num': i}));
}
call.end();
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
done();
});
});
it('should handle a bidirectional streaming request', function(done) {
function checkResponse(index, value) {
assert.equal(value.get('quotient'), index);
assert.equal(value.get('remainder'), 1);
}
var call = divMany(channel);
var response_index = 0;
call.on('data', function(value) {
checkResponse(response_index, value);
response_index += 1;
});
for (var i = 0; i < 7; i++) {
call.write(new math.DivArgs({dividend: 2 * i + 1, divisor: 2}));
}
call.end();
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
done();
});
});
});

@ -0,0 +1,87 @@
var client = require('../surface_client.js');
var builder = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto');
var math = builder.build('math');
/**
* Get a function that deserializes a specific type of protobuf.
* @param {function()} cls The constructor of the message type to deserialize
* @return {function(Buffer):cls} The deserialization function
*/
function deserializeCls(cls) {
/**
* Deserialize a buffer to a message object
* @param {Buffer} arg_buf The buffer to deserialize
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
return cls.decode(arg_buf);
};
}
/**
* Serialize an object to a buffer
* @param {*} arg The object to serialize
* @return {Buffer} The serialized object
*/
function serialize(arg) {
return new Buffer(arg.encode.toBuffer());
}
/**
* Sends a Div request on the channel.
* @param {client.Channel} channel The channel on which to make the request
* @param {*} argument The argument to the call. Should be serializable with
* serialize
* @param {function(?Error, value=)} The callback to for when the response is
* received
* @param {array=} Array of metadata key/value pairs to add to the call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
var div = client.makeUnaryRequestFunction('/Math/Div',
serialize,
deserialize(math.DivReply));
/**
* Sends a Fib request on the channel.
* @param {client.Channel} channel The channel on which to make the request
* @param {*} argument The argument to the call. Should be serializable with
* serialize
* @param {array=} Array of metadata key/value pairs to add to the call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
var fib = client.makeServerStreamRequestFunction('/Math/Fib',
serialize,
deserialize(math.Num));
/**
* Sends a Sum request on the channel.
* @param {client.Channel} channel The channel on which to make the request
* @param {function(?Error, value=)} The callback to for when the response is
* received
* @param {array=} Array of metadata key/value pairs to add to the call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
var sum = client.makeClientStreamRequestFunction('/Math/Sum',
serialize,
deserialize(math.Num));
/**
* Sends a DivMany request on the channel.
* @param {client.Channel} channel The channel on which to make the request
* @param {array=} Array of metadata key/value pairs to add to the call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
var divMany = client.makeBidiStreamRequestFunction('/Math/DivMany',
serialize,
deserialize(math.DivReply));
var channel = new client.Channel('localhost:7070');

@ -0,0 +1,88 @@
var assert = require('assert');
var grpc = require('bindings')('grpc.node');
var Server = require('../server');
var port_picker = require('../port_picker');
/**
* This is used for testing functions with multiple asynchronous calls that
* can happen in different orders. This should be passed the number of async
* function invocations that can occur last, and each of those should call this
* function's return value
* @param {function()} done The function that should be called when a test is
* complete.
* @param {number} count The number of calls to the resulting function if the
* test passes.
* @return {function()} The function that should be called at the end of each
* sequence of asynchronous functions.
*/
function multiDone(done, count) {
return function() {
count -= 1;
if (count <= 0) {
done();
}
};
}
/**
* Responds to every request with the same data as a response
* @param {Stream} stream
*/
function echoHandler(stream) {
stream.pipe(stream);
}
describe('echo server', function() {
it('should echo inputs as responses', function(done) {
done = multiDone(done, 4);
port_picker.nextAvailablePort(function(port) {
var server = new Server();
server.bind(port);
server.register('echo', echoHandler);
server.start();
var req_text = 'echo test string';
var status_text = 'OK';
var channel = new grpc.Channel(port);
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var call = new grpc.Call(channel,
'echo',
deadline);
call.startInvoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.INVOKE_ACCEPTED);
call.startWrite(
new Buffer(req_text),
function(event) {
assert.strictEqual(event.type,
grpc.completionType.WRITE_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
done();
});
}, 0);
call.startRead(function(event) {
assert.strictEqual(event.type, grpc.completionType.READ);
assert.strictEqual(event.data.toString(), req_text);
done();
});
},function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
},function(event) {
assert.strictEqual(event.type, grpc.completionType.FINISHED);
var status = event.data;
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(status.details, status_text);
server.shutdown();
done();
}, 0);
});
});
});

@ -0,0 +1,22 @@
var assert = require('assert');
var grpc = require('./build/Debug/grpc');
var Server = require('server');
function echoHandler(arg_iter) {
return {
'next' : function(write) {
arg_iter.next(function(err, value) {
write(err, value);
});
}
}
}
describe('echo server', function() {
it('should echo inputs as responses', function(done) {
var server = new Server('localhost:5000');
server.register('echo', echoHandler);
server.start();
});
});

@ -0,0 +1,33 @@
#include <limits>
#include "grpc/grpc.h"
#include "grpc/support/time.h"
#include "timeval.h"
namespace grpc {
namespace node {
gpr_timespec MillisecondsToTimespec(double millis) {
if (millis == std::numeric_limits<double>::infinity()) {
return gpr_inf_future;
} else if (millis == -std::numeric_limits<double>::infinity()) {
return gpr_inf_past;
} else {
return gpr_time_from_micros(static_cast<int64_t>(millis*1000));
}
}
double TimespecToMilliseconds(gpr_timespec timespec) {
if (gpr_time_cmp(timespec, gpr_inf_future) == 0) {
return std::numeric_limits<double>::infinity();
} else if (gpr_time_cmp(timespec, gpr_inf_past) == 0) {
return -std::numeric_limits<double>::infinity();
} else {
struct timeval time = gpr_timeval_from_timespec(timespec);
return (static_cast<double>(time.tv_sec) * 1000 +
static_cast<double>(time.tv_usec) / 1000);
}
}
} // namespace node
} // namespace grpc

@ -0,0 +1,15 @@
#ifndef NET_GRPC_NODE_TIMEVAL_H_
#define NET_GRPC_NODE_TIMEVAL_H_
#include "grpc/support/time.h"
namespace grpc {
namespace node {
double TimespecToMilliseconds(gpr_timespec time);
gpr_timespec MillisecondsToTimespec(double millis);
} // namespace node
} // namespace grpc
#endif // NET_GRPC_NODE_TIMEVAL_H_
Loading…
Cancel
Save