diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index db383e4d000..5602011a8e0 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -290,6 +290,7 @@ function timeoutOnSleepingServer(client, done) { call.write({ payload: {body: zeroBuffer(27182)} }); + call.on('data', function() {}); call.on('error', function(error) { assert(error.code === grpc.status.DEADLINE_EXCEEDED || @@ -336,6 +337,7 @@ function customMetadata(client, done) { ['test_initial_metadata_value']); done(); }); + stream.on('data', function() {}); stream.on('status', function(status) { var echo_trailer = status.metadata.get(ECHO_TRAILING_KEY); assert(echo_trailer.length > 0); @@ -361,6 +363,7 @@ function statusCodeAndMessage(client, done) { done(); }); var duplex = client.fullDuplexCall(); + duplex.on('data', function() {}); duplex.on('status', function(status) { assert(status); assert.strictEqual(status.code, 2); diff --git a/src/node/src/client.js b/src/node/src/client.js index c65dd736503..9acf51bd98b 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -131,8 +131,68 @@ function ClientReadableStream(call, deserialize) { this.finished = false; this.reading = false; this.deserialize = common.wrapIgnoreNull(deserialize); + /* Status generated from reading messages from the server. Overrides the + * status from the server if not OK */ + this.read_status = null; + /* Status received from the server. */ + this.received_status = null; } +/** + * Called when all messages from the server have been processed. The status + * parameter indicates that the call should end with that status. status + * defaults to OK if not provided. + * @param {Object!} status The status that the call should end with + */ +function _readsDone(status) { + /* jshint validthis: true */ + if (!status) { + status = {code: grpc.status.OK, details: 'OK'}; + } + this.finished = true; + this.read_status = status; + this._emitStatusIfDone(); +} + +ClientReadableStream.prototype._readsDone = _readsDone; + +/** + * Called to indicate that we have received a status from the server. + */ +function _receiveStatus(status) { + /* jshint validthis: true */ + this.received_status = status; + this._emitStatusIfDone(); +} + +ClientReadableStream.prototype._receiveStatus = _receiveStatus; + +/** + * If we have both processed all incoming messages and received the status from + * the server, emit the status. Otherwise, do nothing. + */ +function _emitStatusIfDone() { + /* jshint validthis: true */ + var status; + if (this.read_status && this.received_status) { + if (this.read_status.code !== grpc.status.OK) { + status = this.read_status; + } else { + status = this.received_status; + } + this.emit('status', status); + if (status.code !== grpc.status.OK) { + var error = new Error(status.details); + error.code = status.code; + error.metadata = status.metadata; + this.emit('error', error); + return; + } + } +} + +ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone; + /** * Read the next object from the stream. * @access private @@ -150,6 +210,7 @@ function _read(size) { if (err) { // Something has gone wrong. Stop reading and wait for status self.finished = true; + self._readsDone(); return; } var data = event.read; @@ -157,8 +218,11 @@ function _read(size) { try { deserialized = self.deserialize(data); } catch (e) { - self.call.cancelWithStatus(grpc.status.INTERNAL, - 'Failed to parse server response'); + self._readsDone({code: grpc.status.INTERNAL, + details: 'Failed to parse server response'}); + } + if (data === null) { + self._readsDone(); } if (self.push(deserialized) && data !== null) { var read_batch = {}; @@ -198,6 +262,11 @@ function ClientDuplexStream(call, serialize, deserialize) { this.serialize = common.wrapIgnoreNull(serialize); this.deserialize = common.wrapIgnoreNull(deserialize); this.call = call; + /* Status generated from reading messages from the server. Overrides the + * status from the server if not OK */ + this.read_status = null; + /* Status received from the server. */ + this.received_status = null; this.on('finish', function() { var batch = {}; batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; @@ -205,6 +274,9 @@ function ClientDuplexStream(call, serialize, deserialize) { }); } +ClientDuplexStream.prototype._readsDone = _readsDone; +ClientDuplexStream.prototype._receiveStatus = _receiveStatus; +ClientDuplexStream.prototype._emitStatusIfDone = _emitStatusIfDone; ClientDuplexStream.prototype._read = _read; ClientDuplexStream.prototype._write = _write; @@ -487,22 +559,13 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { - response.status.metadata = Metadata._fromCoreRepresentation( - response.status.metadata); - stream.emit('status', response.status); - if (response.status.code !== grpc.status.OK) { - var error = new Error(response.status.details); - error.code = response.status.code; - error.metadata = response.status.metadata; - stream.emit('error', error); + if (err) { + stream.emit('error', err); return; - } else { - if (err) { - // Got a batch error, but OK status. Something went wrong - stream.emit('error', err); - return; - } } + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); + stream._receiveStatus(response.status); }); return stream; } @@ -552,22 +615,13 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { - response.status.metadata = Metadata._fromCoreRepresentation( - response.status.metadata); - stream.emit('status', response.status); - if (response.status.code !== grpc.status.OK) { - var error = new Error(response.status.details); - error.code = response.status.code; - error.metadata = response.status.metadata; - stream.emit('error', error); + if (err) { + stream.emit('error', err); return; - } else { - if (err) { - // Got a batch error, but OK status. Something went wrong - stream.emit('error', err); - return; - } } + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); + stream._receiveStatus(response.status); }); return stream; } diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 530f1f77494..8a232d6fc44 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -1000,6 +1000,7 @@ describe('Call propagation', function() { proxy_impl.serverStream = function(parent) { var child = client.serverStream(parent.request, null, {parent: parent}); + child.on('data', function() {}); child.on('error', function(err) { assert(err); assert.strictEqual(err.code, grpc.status.CANCELLED); @@ -1013,6 +1014,7 @@ describe('Call propagation', function() { var proxy_client = new Client('localhost:' + proxy_port, grpc.credentials.createInsecure()); call = proxy_client.serverStream({}); + call.on('data', function() {}); call.on('error', function(err) { done(); }); @@ -1022,6 +1024,7 @@ describe('Call propagation', function() { var call; proxy_impl.bidiStream = function(parent) { var child = client.bidiStream(null, {parent: parent}); + child.on('data', function() {}); child.on('error', function(err) { assert(err); assert.strictEqual(err.code, grpc.status.CANCELLED); @@ -1035,6 +1038,7 @@ describe('Call propagation', function() { var proxy_client = new Client('localhost:' + proxy_port, grpc.credentials.createInsecure()); call = proxy_client.bidiStream(); + call.on('data', function() {}); call.on('error', function(err) { done(); }); @@ -1074,6 +1078,7 @@ describe('Call propagation', function() { proxy_impl.bidiStream = function(parent) { var child = client.bidiStream( null, {parent: parent, propagate_flags: deadline_flags}); + child.on('data', function() {}); child.on('error', function(err) { assert(err); assert(err.code === grpc.status.DEADLINE_EXCEEDED || @@ -1089,6 +1094,7 @@ describe('Call propagation', function() { var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 1); var call = proxy_client.bidiStream(null, {deadline: deadline}); + call.on('data', function() {}); call.on('error', function(err) { done(); }); @@ -1130,6 +1136,7 @@ describe('Cancelling surface client', function() { }); it('Should correctly cancel a server stream call', function(done) { var call = client.fib({'limit': 5}); + call.on('data', function() {}); call.on('error', function(error) { assert.strictEqual(error.code, surface_client.status.CANCELLED); done(); @@ -1138,6 +1145,7 @@ describe('Cancelling surface client', function() { }); it('Should correctly cancel a bidi stream call', function(done) { var call = client.divMany(); + call.on('data', function() {}); call.on('error', function(error) { assert.strictEqual(error.code, surface_client.status.CANCELLED); done();