Merge branch 'master' into debug_qps_stream

pull/5444/head
Vijay Pai 9 years ago
commit 5dfc013d9e
  1. 15
      examples/cpp/README.md
  2. 7
      src/core/iomgr/pollset_multipoller_with_poll_posix.c
  3. 3
      src/node/interop/interop_client.js
  4. 114
      src/node/src/client.js
  5. 8
      src/node/test/surface_test.js

@ -23,21 +23,6 @@ Change your current directory to examples/cpp/helloworld
$ cd examples/cpp/helloworld/
```
### Generating gRPC code
To generate the client and server side interfaces:
```sh
$ make helloworld.grpc.pb.cc helloworld.pb.cc
```
Which internally invokes the proto-compiler as:
```sh
$ protoc -I ../../protos/ --grpc_out=. --plugin=protoc-gen-grpc=grpc_cpp_plugin ../../protos/helloworld.proto
$ protoc -I ../../protos/ --cpp_out=. ../../protos/helloworld.proto
```
### Client and server implementations
The client implementation is at [greeter_client.cc](helloworld/greeter_client.cc).

@ -122,6 +122,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
} else {
h->fds[fd_count++] = h->fds[i];
watchers[pfd_count].fd = h->fds[i];
GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
pfds[pfd_count].fd = h->fds[i]->fd;
pfds[pfd_count].revents = 0;
pfd_count++;
@ -135,8 +136,10 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
gpr_mu_unlock(&pollset->mu);
for (i = 2; i < pfd_count; i++) {
pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker,
POLLIN, POLLOUT, &watchers[i]);
grpc_fd *fd = watchers[i].fd;
pfds[i].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN,
POLLOUT, &watchers[i]);
GRPC_FD_UNREF(fd, "multipoller_start");
}
/* TODO(vpai): Consider first doing a 0 timeout poll here to avoid

@ -290,6 +290,7 @@ function timeoutOnSleepingServer(client, done) {
call.write({
payload: {body: zeroBuffer(27182)}
});
call.on('data', function() {});
call.on('error', function(error) {
assert(error.code === grpc.status.DEADLINE_EXCEEDED ||
@ -336,6 +337,7 @@ function customMetadata(client, done) {
['test_initial_metadata_value']);
done();
});
stream.on('data', function() {});
stream.on('status', function(status) {
var echo_trailer = status.metadata.get(ECHO_TRAILING_KEY);
assert(echo_trailer.length > 0);
@ -361,6 +363,7 @@ function statusCodeAndMessage(client, done) {
done();
});
var duplex = client.fullDuplexCall();
duplex.on('data', function() {});
duplex.on('status', function(status) {
assert(status);
assert.strictEqual(status.code, 2);

@ -131,8 +131,68 @@ function ClientReadableStream(call, deserialize) {
this.finished = false;
this.reading = false;
this.deserialize = common.wrapIgnoreNull(deserialize);
/* Status generated from reading messages from the server. Overrides the
* status from the server if not OK */
this.read_status = null;
/* Status received from the server. */
this.received_status = null;
}
/**
* Called when all messages from the server have been processed. The status
* parameter indicates that the call should end with that status. status
* defaults to OK if not provided.
* @param {Object!} status The status that the call should end with
*/
function _readsDone(status) {
/* jshint validthis: true */
if (!status) {
status = {code: grpc.status.OK, details: 'OK'};
}
this.finished = true;
this.read_status = status;
this._emitStatusIfDone();
}
ClientReadableStream.prototype._readsDone = _readsDone;
/**
* Called to indicate that we have received a status from the server.
*/
function _receiveStatus(status) {
/* jshint validthis: true */
this.received_status = status;
this._emitStatusIfDone();
}
ClientReadableStream.prototype._receiveStatus = _receiveStatus;
/**
* If we have both processed all incoming messages and received the status from
* the server, emit the status. Otherwise, do nothing.
*/
function _emitStatusIfDone() {
/* jshint validthis: true */
var status;
if (this.read_status && this.received_status) {
if (this.read_status.code !== grpc.status.OK) {
status = this.read_status;
} else {
status = this.received_status;
}
this.emit('status', status);
if (status.code !== grpc.status.OK) {
var error = new Error(status.details);
error.code = status.code;
error.metadata = status.metadata;
this.emit('error', error);
return;
}
}
}
ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone;
/**
* Read the next object from the stream.
* @access private
@ -150,6 +210,7 @@ function _read(size) {
if (err) {
// Something has gone wrong. Stop reading and wait for status
self.finished = true;
self._readsDone();
return;
}
var data = event.read;
@ -157,8 +218,11 @@ function _read(size) {
try {
deserialized = self.deserialize(data);
} catch (e) {
self.call.cancelWithStatus(grpc.status.INTERNAL,
'Failed to parse server response');
self._readsDone({code: grpc.status.INTERNAL,
details: 'Failed to parse server response'});
}
if (data === null) {
self._readsDone();
}
if (self.push(deserialized) && data !== null) {
var read_batch = {};
@ -198,6 +262,11 @@ function ClientDuplexStream(call, serialize, deserialize) {
this.serialize = common.wrapIgnoreNull(serialize);
this.deserialize = common.wrapIgnoreNull(deserialize);
this.call = call;
/* Status generated from reading messages from the server. Overrides the
* status from the server if not OK */
this.read_status = null;
/* Status received from the server. */
this.received_status = null;
this.on('finish', function() {
var batch = {};
batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
@ -205,6 +274,9 @@ function ClientDuplexStream(call, serialize, deserialize) {
});
}
ClientDuplexStream.prototype._readsDone = _readsDone;
ClientDuplexStream.prototype._receiveStatus = _receiveStatus;
ClientDuplexStream.prototype._emitStatusIfDone = _emitStatusIfDone;
ClientDuplexStream.prototype._read = _read;
ClientDuplexStream.prototype._write = _write;
@ -487,22 +559,13 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
error.metadata = response.status.metadata;
stream.emit('error', error);
if (err) {
stream.emit('error', err);
return;
} else {
if (err) {
// Got a batch error, but OK status. Something went wrong
stream.emit('error', err);
return;
}
}
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream._receiveStatus(response.status);
});
return stream;
}
@ -552,22 +615,13 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
error.metadata = response.status.metadata;
stream.emit('error', error);
if (err) {
stream.emit('error', err);
return;
} else {
if (err) {
// Got a batch error, but OK status. Something went wrong
stream.emit('error', err);
return;
}
}
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream._receiveStatus(response.status);
});
return stream;
}

@ -1000,6 +1000,7 @@ describe('Call propagation', function() {
proxy_impl.serverStream = function(parent) {
var child = client.serverStream(parent.request, null,
{parent: parent});
child.on('data', function() {});
child.on('error', function(err) {
assert(err);
assert.strictEqual(err.code, grpc.status.CANCELLED);
@ -1013,6 +1014,7 @@ describe('Call propagation', function() {
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
call = proxy_client.serverStream({});
call.on('data', function() {});
call.on('error', function(err) {
done();
});
@ -1022,6 +1024,7 @@ describe('Call propagation', function() {
var call;
proxy_impl.bidiStream = function(parent) {
var child = client.bidiStream(null, {parent: parent});
child.on('data', function() {});
child.on('error', function(err) {
assert(err);
assert.strictEqual(err.code, grpc.status.CANCELLED);
@ -1035,6 +1038,7 @@ describe('Call propagation', function() {
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
call = proxy_client.bidiStream();
call.on('data', function() {});
call.on('error', function(err) {
done();
});
@ -1074,6 +1078,7 @@ describe('Call propagation', function() {
proxy_impl.bidiStream = function(parent) {
var child = client.bidiStream(
null, {parent: parent, propagate_flags: deadline_flags});
child.on('data', function() {});
child.on('error', function(err) {
assert(err);
assert(err.code === grpc.status.DEADLINE_EXCEEDED ||
@ -1089,6 +1094,7 @@ describe('Call propagation', function() {
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
var call = proxy_client.bidiStream(null, {deadline: deadline});
call.on('data', function() {});
call.on('error', function(err) {
done();
});
@ -1130,6 +1136,7 @@ describe('Cancelling surface client', function() {
});
it('Should correctly cancel a server stream call', function(done) {
var call = client.fib({'limit': 5});
call.on('data', function() {});
call.on('error', function(error) {
assert.strictEqual(error.code, surface_client.status.CANCELLED);
done();
@ -1138,6 +1145,7 @@ describe('Cancelling surface client', function() {
});
it('Should correctly cancel a bidi stream call', function(done) {
var call = client.divMany();
call.on('data', function() {});
call.on('error', function(error) {
assert.strictEqual(error.code, surface_client.status.CANCELLED);
done();

Loading…
Cancel
Save