Fix race between parsing messages and receiving status in Node client

pull/5610/head
murgatroid99 9 years ago
parent 929ddb01e3
commit 9adecb06e0
  1. 3
      src/node/interop/interop_client.js
  2. 106
      src/node/src/client.js
  3. 8
      src/node/test/surface_test.js

@ -290,6 +290,7 @@ function timeoutOnSleepingServer(client, done) {
call.write({ call.write({
payload: {body: zeroBuffer(27182)} payload: {body: zeroBuffer(27182)}
}); });
call.on('data', function() {});
call.on('error', function(error) { call.on('error', function(error) {
assert(error.code === grpc.status.DEADLINE_EXCEEDED || assert(error.code === grpc.status.DEADLINE_EXCEEDED ||
@ -336,6 +337,7 @@ function customMetadata(client, done) {
['test_initial_metadata_value']); ['test_initial_metadata_value']);
done(); done();
}); });
stream.on('data', function() {});
stream.on('status', function(status) { stream.on('status', function(status) {
var echo_trailer = status.metadata.get(ECHO_TRAILING_KEY); var echo_trailer = status.metadata.get(ECHO_TRAILING_KEY);
assert(echo_trailer.length > 0); assert(echo_trailer.length > 0);
@ -361,6 +363,7 @@ function statusCodeAndMessage(client, done) {
done(); done();
}); });
var duplex = client.fullDuplexCall(); var duplex = client.fullDuplexCall();
duplex.on('data', function() {});
duplex.on('status', function(status) { duplex.on('status', function(status) {
assert(status); assert(status);
assert.strictEqual(status.code, 2); assert.strictEqual(status.code, 2);

@ -131,8 +131,68 @@ function ClientReadableStream(call, deserialize) {
this.finished = false; this.finished = false;
this.reading = false; this.reading = false;
this.deserialize = common.wrapIgnoreNull(deserialize); this.deserialize = common.wrapIgnoreNull(deserialize);
/* Status generated from reading messages from the server. Overrides the
* status from the server if not OK */
this.read_status = null;
/* Status received from the server. */
this.received_status = null;
} }
/**
* Called when all messages from the server have been processed. The status
* parameter indicates that the call should end with that status. status
* defaults to OK if not provided.
* @param {Object!} status The status that the call should end with
*/
function _readsDone(status) {
/* jshint validthis: true */
if (!status) {
status = {code: grpc.status.OK, details: 'OK'};
}
this.finished = true;
this.read_status = status;
this._emitStatusIfDone();
}
ClientReadableStream.prototype._readsDone = _readsDone;
/**
* Called to indicate that we have received a status from the server.
*/
function _receiveStatus(status) {
/* jshint validthis: true */
this.received_status = status;
this._emitStatusIfDone();
}
ClientReadableStream.prototype._receiveStatus = _receiveStatus;
/**
* If we have both processed all incoming messages and received the status from
* the server, emit the status. Otherwise, do nothing.
*/
function _emitStatusIfDone() {
/* jshint validthis: true */
var status;
if (this.read_status && this.received_status) {
if (this.read_status.code !== grpc.status.OK) {
status = this.read_status;
} else {
status = this.received_status;
}
this.emit('status', status);
if (status.code !== grpc.status.OK) {
var error = new Error(status.details);
error.code = status.code;
error.metadata = status.metadata;
this.emit('error', error);
return;
}
}
}
ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone;
/** /**
* Read the next object from the stream. * Read the next object from the stream.
* @access private * @access private
@ -150,6 +210,7 @@ function _read(size) {
if (err) { if (err) {
// Something has gone wrong. Stop reading and wait for status // Something has gone wrong. Stop reading and wait for status
self.finished = true; self.finished = true;
self._readsDone();
return; return;
} }
var data = event.read; var data = event.read;
@ -157,8 +218,11 @@ function _read(size) {
try { try {
deserialized = self.deserialize(data); deserialized = self.deserialize(data);
} catch (e) { } catch (e) {
self.call.cancelWithStatus(grpc.status.INTERNAL, self._readsDone({code: grpc.status.INTERNAL,
'Failed to parse server response'); details: 'Failed to parse server response'});
}
if (data === null) {
self._readsDone();
} }
if (self.push(deserialized) && data !== null) { if (self.push(deserialized) && data !== null) {
var read_batch = {}; var read_batch = {};
@ -198,6 +262,11 @@ function ClientDuplexStream(call, serialize, deserialize) {
this.serialize = common.wrapIgnoreNull(serialize); this.serialize = common.wrapIgnoreNull(serialize);
this.deserialize = common.wrapIgnoreNull(deserialize); this.deserialize = common.wrapIgnoreNull(deserialize);
this.call = call; this.call = call;
/* Status generated from reading messages from the server. Overrides the
* status from the server if not OK */
this.read_status = null;
/* Status received from the server. */
this.received_status = null;
this.on('finish', function() { this.on('finish', function() {
var batch = {}; var batch = {};
batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
@ -205,6 +274,9 @@ function ClientDuplexStream(call, serialize, deserialize) {
}); });
} }
ClientDuplexStream.prototype._readsDone = _readsDone;
ClientDuplexStream.prototype._receiveStatus = _receiveStatus;
ClientDuplexStream.prototype._emitStatusIfDone = _emitStatusIfDone;
ClientDuplexStream.prototype._read = _read; ClientDuplexStream.prototype._read = _read;
ClientDuplexStream.prototype._write = _write; ClientDuplexStream.prototype._write = _write;
@ -487,22 +559,13 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
var status_batch = {}; var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) { call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
error.metadata = response.status.metadata;
stream.emit('error', error);
return;
} else {
if (err) { if (err) {
// Got a batch error, but OK status. Something went wrong
stream.emit('error', err); stream.emit('error', err);
return; return;
} }
} response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream._receiveStatus(response.status);
}); });
return stream; return stream;
} }
@ -552,22 +615,13 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
var status_batch = {}; var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) { call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
error.metadata = response.status.metadata;
stream.emit('error', error);
return;
} else {
if (err) { if (err) {
// Got a batch error, but OK status. Something went wrong
stream.emit('error', err); stream.emit('error', err);
return; return;
} }
} response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream._receiveStatus(response.status);
}); });
return stream; return stream;
} }

@ -1000,6 +1000,7 @@ describe('Call propagation', function() {
proxy_impl.serverStream = function(parent) { proxy_impl.serverStream = function(parent) {
var child = client.serverStream(parent.request, null, var child = client.serverStream(parent.request, null,
{parent: parent}); {parent: parent});
child.on('data', function() {});
child.on('error', function(err) { child.on('error', function(err) {
assert(err); assert(err);
assert.strictEqual(err.code, grpc.status.CANCELLED); assert.strictEqual(err.code, grpc.status.CANCELLED);
@ -1013,6 +1014,7 @@ describe('Call propagation', function() {
var proxy_client = new Client('localhost:' + proxy_port, var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure()); grpc.credentials.createInsecure());
call = proxy_client.serverStream({}); call = proxy_client.serverStream({});
call.on('data', function() {});
call.on('error', function(err) { call.on('error', function(err) {
done(); done();
}); });
@ -1022,6 +1024,7 @@ describe('Call propagation', function() {
var call; var call;
proxy_impl.bidiStream = function(parent) { proxy_impl.bidiStream = function(parent) {
var child = client.bidiStream(null, {parent: parent}); var child = client.bidiStream(null, {parent: parent});
child.on('data', function() {});
child.on('error', function(err) { child.on('error', function(err) {
assert(err); assert(err);
assert.strictEqual(err.code, grpc.status.CANCELLED); assert.strictEqual(err.code, grpc.status.CANCELLED);
@ -1035,6 +1038,7 @@ describe('Call propagation', function() {
var proxy_client = new Client('localhost:' + proxy_port, var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure()); grpc.credentials.createInsecure());
call = proxy_client.bidiStream(); call = proxy_client.bidiStream();
call.on('data', function() {});
call.on('error', function(err) { call.on('error', function(err) {
done(); done();
}); });
@ -1074,6 +1078,7 @@ describe('Call propagation', function() {
proxy_impl.bidiStream = function(parent) { proxy_impl.bidiStream = function(parent) {
var child = client.bidiStream( var child = client.bidiStream(
null, {parent: parent, propagate_flags: deadline_flags}); null, {parent: parent, propagate_flags: deadline_flags});
child.on('data', function() {});
child.on('error', function(err) { child.on('error', function(err) {
assert(err); assert(err);
assert(err.code === grpc.status.DEADLINE_EXCEEDED || assert(err.code === grpc.status.DEADLINE_EXCEEDED ||
@ -1089,6 +1094,7 @@ describe('Call propagation', function() {
var deadline = new Date(); var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1); deadline.setSeconds(deadline.getSeconds() + 1);
var call = proxy_client.bidiStream(null, {deadline: deadline}); var call = proxy_client.bidiStream(null, {deadline: deadline});
call.on('data', function() {});
call.on('error', function(err) { call.on('error', function(err) {
done(); done();
}); });
@ -1130,6 +1136,7 @@ describe('Cancelling surface client', function() {
}); });
it('Should correctly cancel a server stream call', function(done) { it('Should correctly cancel a server stream call', function(done) {
var call = client.fib({'limit': 5}); var call = client.fib({'limit': 5});
call.on('data', function() {});
call.on('error', function(error) { call.on('error', function(error) {
assert.strictEqual(error.code, surface_client.status.CANCELLED); assert.strictEqual(error.code, surface_client.status.CANCELLED);
done(); done();
@ -1138,6 +1145,7 @@ describe('Cancelling surface client', function() {
}); });
it('Should correctly cancel a bidi stream call', function(done) { it('Should correctly cancel a bidi stream call', function(done) {
var call = client.divMany(); var call = client.divMany();
call.on('data', function() {});
call.on('error', function(error) { call.on('error', function(error) {
assert.strictEqual(error.code, surface_client.status.CANCELLED); assert.strictEqual(error.code, surface_client.status.CANCELLED);
done(); done();

Loading…
Cancel
Save