Started adding support for trailing metadata

pull/1140/head
murgatroid99 10 years ago
parent 96e32aa0a5
commit d07a278ef4
  1. 4
      src/node/src/client.js
  2. 43
      src/node/src/server.js
  3. 115
      src/node/test/surface_test.js

@ -241,13 +241,13 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
callback(err);
return;
}
emitter.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
callback(error);
return;
}
emitter.emit('status', response.status);
emitter.emit('metadata', response.metadata);
callback(null, deserialize(response.read));
});
@ -312,13 +312,13 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
callback(err);
return;
}
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
callback(error);
return;
}
stream.emit('status', response.status);
callback(null, deserialize(response.read));
});
});

@ -70,6 +70,9 @@ function handleError(call, error) {
status.details = error.details;
}
}
if (error.hasOwnProperty('metadata')) {
status.metadata = error.metadata;
}
var error_batch = {};
error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(error_batch, function(){});
@ -102,15 +105,20 @@ function waitForCancel(call, emitter) {
* @param {*} value The value to respond with
* @param {function(*):Buffer=} serialize Serialization function for the
* response
* @param {Object=} metadata Optional trailing metadata to send with status
*/
function sendUnaryResponse(call, value, serialize) {
function sendUnaryResponse(call, value, serialize, metadata) {
var end_batch = {};
end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
var status = {
code: grpc.status.OK,
details: 'OK',
metadata: {}
};
if (metadata) {
status.metadata = metadata;
}
end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(end_batch, function (){});
}
@ -143,6 +151,7 @@ function setUpWritable(stream, serialize) {
function setStatus(err) {
var code = grpc.status.INTERNAL;
var details = 'Unknown Error';
var metadata = {};
if (err.hasOwnProperty('message')) {
details = err.message;
}
@ -152,7 +161,10 @@ function setUpWritable(stream, serialize) {
details = err.details;
}
}
stream.status = {code: code, details: details, metadata: {}};
if (err.hasOwnProperty('metadata')) {
metadata = err.metadata;
}
stream.status = {code: code, details: details, metadata: metadata};
}
/**
* Terminate the call. This includes indicating that reads are done, draining
@ -166,6 +178,17 @@ function setUpWritable(stream, serialize) {
stream.end();
}
stream.on('error', terminateCall);
/**
* Override of Writable#end method that allows for sending metadata with a
* success status.
* @param {Object=} metadata Metadata to send with the status
*/
stream.end = function(metadata) {
if (metadata) {
stream.status.metadata = metadata;
}
Writable.prototype.end.call(this);
};
}
/**
@ -335,11 +358,13 @@ function handleUnary(call, handler, metadata) {
if (emitter.cancelled) {
return;
}
handler.func(emitter, function sendUnaryData(err, value) {
handler.func(emitter, function sendUnaryData(err, value, trailer) {
if (err) {
err.metadata = trailer;
handleError(call, err);
} else {
sendUnaryResponse(call, value, handler.serialize, trailer);
}
sendUnaryResponse(call, value, handler.serialize);
});
});
}
@ -378,12 +403,14 @@ function handleClientStreaming(call, handler, metadata) {
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
call.startBatch(metadata_batch, function() {});
handler.func(stream, function(err, value) {
handler.func(stream, function(err, value, trailer) {
stream.terminate();
if (err) {
err.metadata = trailer;
handleError(call, err);
} else {
sendUnaryResponse(call, value, handler.serialize, trailer);
}
sendUnaryResponse(call, value, handler.serialize);
});
}

@ -126,6 +126,121 @@ describe('Generic client and server', function() {
});
});
});
describe.only('Trailing metadata', function() {
var client;
var server;
before(function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
var test_service = test_proto.lookup('TestService');
var Server = grpc.buildServer([test_service]);
server = new Server({
TestService: {
unary: function(call, cb) {
var req = call.request;
if (req.error) {
cb(new Error('Requested error'), null, {metadata: ['yes']});
} else {
cb(null, {count: 1}, {metadata: ['yes']});
}
},
clientStream: function(stream, cb){
var count = 0;
stream.on('data', function(data) {
if (data.error) {
cb(new Error('Requested error'), null, {metadata: ['yes']});
} else {
count += 1;
}
});
stream.on('end', function() {
cb(null, {count: count}, {metadata: ['yes']});
});
},
serverStream: function(stream) {
var req = stream.request;
if (req.error) {
var err = new Error('Requested error');
err.metadata = {metadata: ['yes']};
stream.emit(err);
} else {
for (var i = 0; i < 5; i++) {
stream.write({count: i});
}
stream.end({metadata: ['yes']});
}
},
bidiStream: function(stream) {
var count = 0;
stream.on('data', function(data) {
if (data.error) {
var err = new Error('Requested error');
err.metadata = {
metadata: 'yes',
count: '' + count
};
stream.emit('error', err);
} else {
stream.write({count: count});
count += 1;
}
});
stream.on('end', function() {
stream.end({metadata: ['yes']});
});
}
}
});
var port = server.bind('localhost:0');
var Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port);
server.listen();
});
after(function() {
server.shutdown();
});
it('when a unary call succeeds', function(done) {
var call = client.unary({error: false}, function(err, data) {
assert.ifError(err);
});
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
});
});
it('when a unary call fails', function(done) {
var call = client.unary({error: true}, function(err, data) {
assert(err);
});
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
});
});
it('when a client stream call succeeds', function(done) {
var call = client.clientStream(function(err, data) {
assert.ifError(err);
});
call.write({error: false});
call.write({error: false});
call.end();
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
});
});
it('when a client stream call fails', function(done) {
var call = client.clientStream(function(err, data) {
assert(err);
});
call.write({error: false});
call.write({error: true});
call.end();
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
});
});
});
describe('Cancelling surface client', function() {
var client;
var server;

Loading…
Cancel
Save