Add parent call propagation API to Node library

pull/2942/head
murgatroid99 9 years ago
parent 118f65dc8c
commit 99e2104758
  1. 20
      src/node/ext/call.cc
  2. 20
      src/node/ext/node_grpc.cc
  3. 7
      src/node/src/client.js
  4. 1
      src/node/src/server.js
  5. 18
      src/node/test/constant_test.js
  6. 183
      src/node/test/surface_test.js

@ -502,6 +502,22 @@ NAN_METHOD(Call::New) {
return NanThrowTypeError( return NanThrowTypeError(
"Call's third argument must be a date or a number"); "Call's third argument must be a date or a number");
} }
// These arguments are at the end because they are optional
grpc_call *parent_call = NULL;
if (Call::HasInstance(args[4])) {
Call *parent_obj = ObjectWrap::Unwrap<Call>(args[4]->ToObject());
parent_call = parent_obj->wrapped_call;
} else if (!(args[4]->IsUndefined() || args[4]->IsNull())) {
return NanThrowTypeError(
"Call's fifth argument must be another call, if provided");
}
gpr_uint32 propagate_flags = GRPC_PROPAGATE_DEFAULTS;
if (args[5]->IsUint32()) {
propagate_flags = args[5]->Uint32Value();
} else if (!(args[5]->IsUndefined() || args[5]->IsNull())) {
return NanThrowTypeError(
"Call's fifth argument must be propagate flags, if provided");
}
Handle<Object> channel_object = args[0]->ToObject(); Handle<Object> channel_object = args[0]->ToObject();
Channel *channel = ObjectWrap::Unwrap<Channel>(channel_object); Channel *channel = ObjectWrap::Unwrap<Channel>(channel_object);
if (channel->GetWrappedChannel() == NULL) { if (channel->GetWrappedChannel() == NULL) {
@ -514,12 +530,12 @@ NAN_METHOD(Call::New) {
if (args[3]->IsString()) { if (args[3]->IsString()) {
NanUtf8String host_override(args[3]); NanUtf8String host_override(args[3]);
wrapped_call = grpc_channel_create_call( wrapped_call = grpc_channel_create_call(
wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS, wrapped_channel, parent_call, propagate_flags,
CompletionQueueAsyncWorker::GetQueue(), *method, CompletionQueueAsyncWorker::GetQueue(), *method,
*host_override, MillisecondsToTimespec(deadline)); *host_override, MillisecondsToTimespec(deadline));
} else if (args[3]->IsUndefined() || args[3]->IsNull()) { } else if (args[3]->IsUndefined() || args[3]->IsNull()) {
wrapped_call = grpc_channel_create_call( wrapped_call = grpc_channel_create_call(
wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS, wrapped_channel, parent_call, propagate_flags,
CompletionQueueAsyncWorker::GetQueue(), *method, CompletionQueueAsyncWorker::GetQueue(), *method,
NULL, MillisecondsToTimespec(deadline)); NULL, MillisecondsToTimespec(deadline));
} else { } else {

@ -159,12 +159,32 @@ void InitOpTypeConstants(Handle<Object> exports) {
op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER); op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER);
} }
void InitPropagateConstants(Handle<Object> exports) {
NanScope();
Handle<Object> propagate = NanNew<Object>();
exports->Set(NanNew("propagate"), propagate);
Handle<Value> DEADLINE(NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_DEADLINE));
propagate->Set(NanNew("DEADLINE"), DEADLINE);
Handle<Value> CENSUS_STATS_CONTEXT(
NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CENSUS_STATS_CONTEXT));
propagate->Set(NanNew("CENSUS_STATS_CONTEXT"), CENSUS_STATS_CONTEXT);
Handle<Value> CENSUS_TRACING_CONTEXT(
NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT));
propagate->Set(NanNew("CENSUS_TRACING_CONTEXT"), CENSUS_TRACING_CONTEXT);
Handle<Value> CANCELLATION(
NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CANCELLATION));
propagate->Set(NanNew("CANCELLATION"), CANCELLATION);
Handle<Value> DEFAULTS(NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_DEFAULTS));
propagate->Set(NanNew("DEFAULTS"), DEFAULTS);
}
void init(Handle<Object> exports) { void init(Handle<Object> exports) {
NanScope(); NanScope();
grpc_init(); grpc_init();
InitStatusConstants(exports); InitStatusConstants(exports);
InitCallErrorConstants(exports); InitCallErrorConstants(exports);
InitOpTypeConstants(exports); InitOpTypeConstants(exports);
InitPropagateConstants(exports);
grpc::node::Call::Init(exports); grpc::node::Call::Init(exports);
grpc::node::Channel::Init(exports); grpc::node::Channel::Init(exports);

@ -216,14 +216,19 @@ ClientDuplexStream.prototype.getPeer = getPeer;
function getCall(channel, method, options) { function getCall(channel, method, options) {
var deadline; var deadline;
var host; var host;
var parent;
var propagate_flags;
if (options) { if (options) {
deadline = options.deadline; deadline = options.deadline;
host = options.host; host = options.host;
parent = _.get(options, 'parent.call');
propagate_flags = options.propagate_flags;
} }
if (deadline === undefined) { if (deadline === undefined) {
deadline = Infinity; deadline = Infinity;
} }
return new grpc.Call(channel, method, deadline, host); return new grpc.Call(channel, method, deadline, host,
parent, propagate_flags);
} }
/** /**

@ -432,6 +432,7 @@ function handleUnary(call, handler, metadata) {
}); });
emitter.metadata = metadata; emitter.metadata = metadata;
waitForCancel(call, emitter); waitForCancel(call, emitter);
emitter.call = call;
var batch = {}; var batch = {};
batch[grpc.opType.RECV_MESSAGE] = true; batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) { call.startBatch(batch, function(err, result) {

@ -78,6 +78,18 @@ var callErrorNames = [
'INVALID_FLAGS' 'INVALID_FLAGS'
]; ];
/**
* List of all propagate flag names
* @const
* @type {Array.<string>}
*/
var propagateFlagNames = [
'DEADLINE',
'CENSUS_STATS_CONTEXT',
'CENSUS_TRACING_CONTEXT',
'CANCELLATION'
];
describe('constants', function() { describe('constants', function() {
it('should have all of the status constants', function() { it('should have all of the status constants', function() {
for (var i = 0; i < statusNames.length; i++) { for (var i = 0; i < statusNames.length; i++) {
@ -91,4 +103,10 @@ describe('constants', function() {
'call error missing: ' + callErrorNames[i]); 'call error missing: ' + callErrorNames[i]);
} }
}); });
it('should have all of the propagate flags', function() {
for (var i = 0; i < propagateFlagNames.length; i++) {
assert(grpc.propagate.hasOwnProperty(propagateFlagNames[i]),
'call error missing: ' + propagateFlagNames[i]);
}
});
}); });

@ -47,6 +47,27 @@ var mathService = math_proto.lookup('math.Math');
var _ = require('lodash'); var _ = require('lodash');
/**
* This is used for testing functions with multiple asynchronous calls that
* can happen in different orders. This should be passed the number of async
* function invocations that can occur last, and each of those should call this
* function's return value
* @param {function()} done The function that should be called when a test is
* complete.
* @param {number} count The number of calls to the resulting function if the
* test passes.
* @return {function()} The function that should be called at the end of each
* sequence of asynchronous functions.
*/
function multiDone(done, count) {
return function() {
count -= 1;
if (count <= 0) {
done();
}
};
}
var server_insecure_creds = grpc.ServerCredentials.createInsecure(); var server_insecure_creds = grpc.ServerCredentials.createInsecure();
describe('File loader', function() { describe('File loader', function() {
@ -272,12 +293,14 @@ describe('Echo metadata', function() {
}); });
}); });
describe('Other conditions', function() { describe('Other conditions', function() {
var test_service;
var Client;
var client; var client;
var server; var server;
var port; var port;
before(function() { before(function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
var test_service = test_proto.lookup('TestService'); test_service = test_proto.lookup('TestService');
server = new grpc.Server(); server = new grpc.Server();
server.addProtoService(test_service, { server.addProtoService(test_service, {
unary: function(call, cb) { unary: function(call, cb) {
@ -339,7 +362,7 @@ describe('Other conditions', function() {
} }
}); });
port = server.bind('localhost:0', server_insecure_creds); port = server.bind('localhost:0', server_insecure_creds);
var Client = surface_client.makeProtobufClientConstructor(test_service); Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port, grpc.Credentials.createInsecure()); client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
server.start(); server.start();
}); });
@ -592,6 +615,162 @@ describe('Other conditions', function() {
}); });
}); });
}); });
describe('Call propagation', function() {
var proxy;
var proxy_impl;
beforeEach(function() {
proxy = new grpc.Server();
proxy_impl = {
unary: function(call) {},
clientStream: function(stream) {},
serverStream: function(stream) {},
bidiStream: function(stream) {}
};
});
afterEach(function() {
console.log('Shutting down server');
proxy.shutdown();
});
describe('Cancellation', function() {
it('With a unary call', function(done) {
done = multiDone(done, 2);
proxy_impl.unary = function(parent, callback) {
client.unary(parent.request, function(err, value) {
try {
assert(err);
assert.strictEqual(err.code, grpc.status.CANCELLED);
} finally {
callback(err, value);
done();
}
}, null, {parent: parent});
call.cancel();
};
proxy.addProtoService(test_service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
var call = proxy_client.unary({}, function(err, value) {
done();
});
});
it('With a client stream call', function(done) {
done = multiDone(done, 2);
proxy_impl.clientStream = function(parent, callback) {
client.clientStream(function(err, value) {
try {
assert(err);
assert.strictEqual(err.code, grpc.status.CANCELLED);
} finally {
callback(err, value);
done();
}
}, null, {parent: parent});
call.cancel();
};
proxy.addProtoService(test_service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
var call = proxy_client.clientStream(function(err, value) {
done();
});
});
it('With a server stream call', function(done) {
done = multiDone(done, 2);
proxy_impl.serverStream = function(parent) {
var child = client.serverStream(parent.request, null,
{parent: parent});
child.on('error', function(err) {
assert(err);
assert.strictEqual(err.code, grpc.status.CANCELLED);
done();
});
call.cancel();
};
proxy.addProtoService(test_service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
var call = proxy_client.serverStream({});
call.on('error', function(err) {
done();
});
});
it('With a bidi stream call', function(done) {
done = multiDone(done, 2);
proxy_impl.bidiStream = function(parent) {
var child = client.bidiStream(null, {parent: parent});
child.on('error', function(err) {
assert(err);
assert.strictEqual(err.code, grpc.status.CANCELLED);
done();
});
call.cancel();
};
proxy.addProtoService(test_service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
var call = proxy_client.bidiStream();
call.on('error', function(err) {
done();
});
});
});
describe('Deadline', function() {
it.skip('With a client stream call', function(done) {
done = multiDone(done, 2);
proxy_impl.clientStream = function(parent, callback) {
client.clientStream(function(err, value) {
try {
assert(err);
assert.strictEqual(err.code, grpc.status.DEADLINE_EXCEEDED);
} finally {
callback(err, value);
done();
}
}, null, {parent: parent});
};
proxy.addProtoService(test_service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
var call = proxy_client.clientStream(function(err, value) {
done();
}, null, {deadline: deadline});
});
it.skip('With a bidi stream call', function(done) {
done = multiDone(done, 2);
proxy_impl.bidiStream = function(parent) {
var child = client.bidiStream(null, {parent: parent});
child.on('error', function(err) {
assert(err);
assert.strictEqual(err.code, grpc.status.DEADLINE_EXCEEDED);
done();
});
};
proxy.addProtoService(test_service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
var call = proxy_client.bidiStream(null, {deadline: deadline});
call.on('error', function(err) {
done();
});
});
});
});
}); });
describe('Cancelling surface client', function() { describe('Cancelling surface client', function() {
var client; var client;

Loading…
Cancel
Save