Wrap connectivity API, expose it to client as waitForReady

pull/2696/head
murgatroid99 10 years ago
parent 8e06c2e62e
commit c7f4d4fb84
  1. 80
      src/node/ext/channel.cc
  2. 2
      src/node/ext/channel.h
  3. 2
      src/node/ext/completion_queue_async_worker.cc
  4. 19
      src/node/ext/node_grpc.cc
  5. 41
      src/node/src/client.js
  6. 88
      src/node/test/channel_test.js
  7. 19
      src/node/test/constant_test.js
  8. 76
      src/node/test/surface_test.js

@ -33,12 +33,17 @@
#include <vector>
#include "grpc/support/log.h"
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
#include "call.h"
#include "channel.h"
#include "completion_queue_async_worker.h"
#include "credentials.h"
#include "timeval.h"
namespace grpc {
namespace node {
@ -51,11 +56,31 @@ using v8::Handle;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
using v8::Number;
using v8::Object;
using v8::Persistent;
using v8::String;
using v8::Value;
class ConnectivityStateOp : public Op {
public:
Handle<Value> GetNodeValue() const {
return NanNew<Number>(new_state);
}
bool ParseOp(Handle<Value> value, grpc_op *out,
shared_ptr<Resources> resources) {
return true;
}
grpc_connectivity_state new_state;
protected:
std::string GetTypeString() const {
return "new_state";
}
};
NanCallback *Channel::constructor;
Persistent<FunctionTemplate> Channel::fun_tpl;
@ -78,6 +103,12 @@ void Channel::Init(Handle<Object> exports) {
NanNew<FunctionTemplate>(Close)->GetFunction());
NanSetPrototypeTemplate(tpl, "getTarget",
NanNew<FunctionTemplate>(GetTarget)->GetFunction());
NanSetPrototypeTemplate(
tpl, "getConnectivityState",
NanNew<FunctionTemplate>(GetConnectivityState)->GetFunction());
NanSetPrototypeTemplate(
tpl, "watchConnectivityState",
NanNew<FunctionTemplate>(WatchConnectivityState)->GetFunction());
NanAssignPersistent(fun_tpl, tpl);
Handle<Function> ctr = tpl->GetFunction();
constructor = new NanCallback(ctr);
@ -196,5 +227,54 @@ NAN_METHOD(Channel::GetTarget) {
NanReturnValue(NanNew(grpc_channel_get_target(channel->wrapped_channel)));
}
NAN_METHOD(Channel::GetConnectivityState) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError(
"getConnectivityState can only be called on Channel objects");
}
Channel *channel = ObjectWrap::Unwrap<Channel>(args.This());
int try_to_connect = (int)args[0]->Equals(NanTrue());
NanReturnValue(grpc_channel_check_connectivity_state(channel->wrapped_channel,
try_to_connect));
}
NAN_METHOD(Channel::WatchConnectivityState) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError(
"watchConnectivityState can only be called on Channel objects");
}
if (!args[0]->IsUint32()) {
return NanThrowTypeError(
"watchConnectivityState's first argument must be a channel state");
}
if (!(args[1]->IsNumber() || args[1]->IsDate())) {
return NanThrowTypeError(
"watchConnectivityState's second argument must be a date or a number");
}
if (!args[2]->IsFunction()) {
return NanThrowTypeError(
"watchConnectivityState's third argument must be a callback");
}
grpc_connectivity_state last_state =
static_cast<grpc_connectivity_state>(args[0]->Uint32Value());
double deadline = args[1]->NumberValue();
Handle<Function> callback_func = args[2].As<Function>();
NanCallback *callback = new NanCallback(callback_func);
Channel *channel = ObjectWrap::Unwrap<Channel>(args.This());
ConnectivityStateOp *op = new ConnectivityStateOp();
unique_ptr<OpVec> ops(new OpVec());
ops->push_back(unique_ptr<Op>(op));
grpc_channel_watch_connectivity_state(
channel->wrapped_channel, last_state, &op->new_state,
MillisecondsToTimespec(deadline), CompletionQueueAsyncWorker::GetQueue(),
new struct tag(callback,
ops.release(),
shared_ptr<Resources>(nullptr)));
CompletionQueueAsyncWorker::Next();
NanReturnUndefined();
}
} // namespace node
} // namespace grpc

@ -67,6 +67,8 @@ class Channel : public ::node::ObjectWrap {
static NAN_METHOD(New);
static NAN_METHOD(Close);
static NAN_METHOD(GetTarget);
static NAN_METHOD(GetConnectivityState);
static NAN_METHOD(WatchConnectivityState);
static NanCallback *constructor;
static v8::Persistent<v8::FunctionTemplate> fun_tpl;

@ -65,7 +65,7 @@ void CompletionQueueAsyncWorker::Execute() {
result =
grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME));
if (!result.success) {
SetErrorMessage("The batch encountered an error");
SetErrorMessage("The asnyc function encountered an error");
}
}

@ -159,12 +159,31 @@ void InitOpTypeConstants(Handle<Object> exports) {
op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER);
}
void InitConnectivityStateConstants(Handle<Object> exports) {
NanScope();
Handle<Object> channel_state = NanNew<Object>();
exports->Set(NanNew("connectivityState"), channel_state);
Handle<Value> IDLE(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_IDLE));
channel_state->Set(NanNew("IDLE"), IDLE);
Handle<Value> CONNECTING(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_CONNECTING));
channel_state->Set(NanNew("CONNECTING"), CONNECTING);
Handle<Value> READY(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_READY));
channel_state->Set(NanNew("READY"), READY);
Handle<Value> TRANSIENT_FAILURE(
NanNew<Uint32, uint32_t>(GRPC_CHANNEL_TRANSIENT_FAILURE));
channel_state->Set(NanNew("TRANSIENT_FAILURE"), TRANSIENT_FAILURE);
Handle<Value> FATAL_FAILURE(
NanNew<Uint32, uint32_t>(GRPC_CHANNEL_FATAL_FAILURE));
channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE);
}
void init(Handle<Object> exports) {
NanScope();
grpc_init();
InitStatusConstants(exports);
InitCallErrorConstants(exports);
InitOpTypeConstants(exports);
InitConnectivityStateConstants(exports);
grpc::node::Call::Init(exports);
grpc::node::Channel::Init(exports);

@ -551,6 +551,47 @@ exports.makeClientConstructor = function(methods, serviceName) {
this.updateMetadata = updateMetadata;
}
/**
* Wait for the client to be ready. The callback will be called when the
* client has successfully connected to the server, and it will be called
* with an error if the attempt to connect to the server has unrecoverablly
* failed or if the deadline expires. This function does not automatically
* attempt to initiate the connection, so the callback will not be called
* unless you also start a method call or call $tryConnect.
* @param {(Date|Number)} deadline When to stop waiting for a connection. Pass
* Infinity to wait forever.
* @param {function(Error)} callback The callback to call when done attempting
* to connect.
*/
Client.prototype.$waitForReady = function(deadline, callback) {
var self = this;
var checkState = function(err, result) {
if (err) {
callback(new Error('Failed to connect before the deadline'));
}
var new_state = result.new_state;
console.log(result);
if (new_state === grpc.connectivityState.READY) {
callback();
} else if (new_state === grpc.connectivityState.FATAL_FAILURE) {
callback(new Error('Failed to connect to server'));
} else {
self.channel.watchConnectivityState(new_state, deadline, checkState);
}
};
checkState(null, {new_state: this.channel.getConnectivityState()});
};
/**
* Attempt to connect to the server. That will happen automatically if
* you try to make a method call with this client, so this function should
* only be used if you want to know that you have a connection before making
* any calls.
*/
Client.prototype.$tryConnect = function() {
this.channel.getConnectivityState(true);
};
_.each(methods, function(attrs, name) {
var method_type;
if (attrs.requestStream) {

@ -36,6 +36,27 @@
var assert = require('assert');
var grpc = require('bindings')('grpc.node');
/**
* This is used for testing functions with multiple asynchronous calls that
* can happen in different orders. This should be passed the number of async
* function invocations that can occur last, and each of those should call this
* function's return value
* @param {function()} done The function that should be called when a test is
* complete.
* @param {number} count The number of calls to the resulting function if the
* test passes.
* @return {function()} The function that should be called at the end of each
* sequence of asynchronous functions.
*/
function multiDone(done, count) {
return function() {
count -= 1;
if (count <= 0) {
done();
}
};
}
describe('channel', function() {
describe('constructor', function() {
it('should require a string for the first argument', function() {
@ -73,14 +94,16 @@ describe('channel', function() {
});
});
describe('close', function() {
var channel;
beforeEach(function() {
channel = new grpc.Channel('hostname', {});
});
it('should succeed silently', function() {
var channel = new grpc.Channel('hostname', {});
assert.doesNotThrow(function() {
channel.close();
});
});
it('should be idempotent', function() {
var channel = new grpc.Channel('hostname', {});
assert.doesNotThrow(function() {
channel.close();
channel.close();
@ -88,9 +111,68 @@ describe('channel', function() {
});
});
describe('getTarget', function() {
var channel;
beforeEach(function() {
channel = new grpc.Channel('hostname', {});
});
it('should return a string', function() {
var channel = new grpc.Channel('localhost', {});
assert.strictEqual(typeof channel.getTarget(), 'string');
});
});
describe('getConnectivityState', function() {
var channel;
beforeEach(function() {
channel = new grpc.Channel('hostname', {});
});
it('should return IDLE for a new channel', function() {
assert.strictEqual(channel.getConnectivityState(),
grpc.connectivityState.IDLE);
});
});
describe('watchConnectivityState', function() {
var channel;
beforeEach(function() {
channel = new grpc.Channel('localhost', {});
});
afterEach(function() {
channel.close();
});
it('should time out if called alone', function(done) {
var old_state = channel.getConnectivityState();
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
channel.watchConnectivityState(old_state, deadline, function(err, value) {
assert(err);
done();
});
});
it('should complete if a connection attempt is forced', function(done) {
var old_state = channel.getConnectivityState();
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
channel.watchConnectivityState(old_state, deadline, function(err, value) {
assert.ifError(err);
assert.notEqual(value.new_state, old_state);
done();
});
channel.getConnectivityState(true);
});
it('should complete twice if called twice', function(done) {
done = multiDone(done, 2);
var old_state = channel.getConnectivityState();
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
channel.watchConnectivityState(old_state, deadline, function(err, value) {
assert.ifError(err);
assert.notEqual(value.new_state, old_state);
done();
});
channel.watchConnectivityState(old_state, deadline, function(err, value) {
assert.ifError(err);
assert.notEqual(value.new_state, old_state);
done();
});
channel.getConnectivityState(true);
});
});
});

@ -78,6 +78,19 @@ var callErrorNames = [
'INVALID_FLAGS'
];
/**
* List of all connectivity state names
* @const
* @type {Array.<string>}
*/
var connectivityStateNames = [
'IDLE',
'CONNECTING',
'READY',
'TRANSIENT_FAILURE',
'FATAL_FAILURE'
];
describe('constants', function() {
it('should have all of the status constants', function() {
for (var i = 0; i < statusNames.length; i++) {
@ -91,4 +104,10 @@ describe('constants', function() {
'call error missing: ' + callErrorNames[i]);
}
});
it('should have all of the connectivity states', function() {
for (var i = 0; i < connectivityStateNames.length; i++) {
assert(grpc.connectivityState.hasOwnProperty(connectivityStateNames[i]),
'connectivity status missing: ' + connectivityStateNames[i]);
}
});
});

@ -47,6 +47,27 @@ var mathService = math_proto.lookup('math.Math');
var _ = require('lodash');
/**
* This is used for testing functions with multiple asynchronous calls that
* can happen in different orders. This should be passed the number of async
* function invocations that can occur last, and each of those should call this
* function's return value
* @param {function()} done The function that should be called when a test is
* complete.
* @param {number} count The number of calls to the resulting function if the
* test passes.
* @return {function()} The function that should be called at the end of each
* sequence of asynchronous functions.
*/
function multiDone(done, count) {
return function() {
count -= 1;
if (count <= 0) {
done();
}
};
}
describe('File loader', function() {
it('Should load a proto file by default', function() {
assert.doesNotThrow(function() {
@ -110,6 +131,61 @@ describe('Server.prototype.addProtoService', function() {
});
});
});
describe('Client#$waitForReady', function() {
var server;
var port;
var Client;
var client;
before(function() {
server = new grpc.Server();
port = server.bind('localhost:0');
server.start();
Client = surface_client.makeProtobufClientConstructor(mathService);
});
beforeEach(function() {
client = new Client('localhost:' + port);
});
after(function() {
server.shutdown();
});
it('should complete when a call is initiated', function(done) {
client.$waitForReady(Infinity, function(error) {
assert.ifError(error);
done();
});
var call = client.div({}, function(err, response) {});
call.cancel();
});
it('should complete if $tryConnect is called', function(done) {
client.$waitForReady(Infinity, function(error) {
assert.ifError(error);
done();
});
client.$tryConnect();
});
it('should complete if called more than once', function(done) {
done = multiDone(done, 2);
client.$waitForReady(Infinity, function(error) {
assert.ifError(error);
done();
});
client.$waitForReady(Infinity, function(error) {
assert.ifError(error);
done();
});
client.$tryConnect();
});
it('should complete if called when already ready', function(done) {
client.$waitForReady(Infinity, function(error) {
assert.ifError(error);
client.$waitForReady(Infinity, function(error) {
assert.ifError(error);
done();
});
});
client.$tryConnect();
});
});
describe('Echo service', function() {
var server;
var client;

Loading…
Cancel
Save