Merge remote-tracking branch 'upstream/master'

pull/145/head
murgatroid99 10 years ago
commit 749985eb8d
  1. 7
      src/core/surface/call.c
  2. 82
      src/node/common.js
  3. 41
      src/node/examples/math.proto
  4. 83
      src/node/examples/math_server.js
  5. 98
      src/node/main.js
  6. 9
      src/node/package.json
  7. 6
      src/node/server.js
  8. 34
      src/node/surface_client.js
  9. 65
      src/node/surface_server.js
  10. 86
      src/node/test/math_client_test.js
  11. 75
      src/node/test/surface_test.js
  12. 5
      test/core/end2end/dualstack_socket_test.c
  13. 3
      test/core/end2end/gen_build_json.py
  14. 4
      test/core/end2end/no_server_test.c
  15. 22
      test/core/end2end/tests/cancel_after_accept.c
  16. 22
      test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
  17. 22
      test/core/end2end/tests/cancel_after_invoke.c
  18. 4
      test/core/end2end/tests/cancel_before_invoke.c
  19. 18
      test/core/end2end/tests/cancel_in_a_vacuum.c
  20. 52
      test/core/end2end/tests/cancel_test_helpers.h
  21. 4
      test/cpp/end2end/end2end_test.cc

@ -897,7 +897,12 @@ grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) {
static void call_alarm(void *arg, int success) {
grpc_call *call = arg;
if (success) {
grpc_call_cancel(call);
if (call->is_client) {
grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded");
} else {
grpc_call_cancel(call);
}
}
grpc_call_internal_unref(call);
}

@ -31,32 +31,68 @@
*
*/
var _ = require('highland');
/**
* Get a function that deserializes a specific type of protobuf.
* @param {function()} cls The constructor of the message type to deserialize
* @return {function(Buffer):cls} The deserialization function
*/
function deserializeCls(cls) {
/**
* Deserialize a buffer to a message object
* @param {Buffer} arg_buf The buffer to deserialize
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
return cls.decode(arg_buf);
};
}
/**
* Get a function that serializes objects to a buffer by protobuf class.
* @param {function()} Cls The constructor of the message type to serialize
* @return {function(Cls):Buffer} The serialization function
*/
function serializeCls(Cls) {
/**
* Serialize an object to a Buffer
* @param {Object} arg The object to serialize
* @return {Buffer} The serialized object
*/
return function serialize(arg) {
return new Buffer(new Cls(arg).encode().toBuffer());
};
}
/**
* When the given stream finishes without error, call the callback once. This
* will not be called until something begins to consume the stream.
* @param {function} callback The callback to call at stream end
* @param {stream} source The stream to watch
* @return {stream} The stream with the callback attached
* Get the fully qualified (dotted) name of a ProtoBuf.Reflect value.
* @param {ProtoBuf.Reflect.Namespace} value The value to get the name of
* @return {string} The fully qualified name of the value
*/
function onSuccessfulStreamEnd(callback, source) {
var error = false;
return source.consume(function(err, x, push, next) {
if (x === _.nil) {
if (!error) {
callback();
}
push(null, x);
} else if (err) {
error = true;
push(err);
next();
} else {
push(err, x);
next();
function fullyQualifiedName(value) {
if (value === null || value === undefined) {
return '';
}
var name = value.name;
if (value.hasOwnProperty('parent')) {
var parent_name = fullyQualifiedName(value.parent);
if (parent_name !== '') {
name = parent_name + '.' + name;
}
});
}
return name;
}
exports.onSuccessfulStreamEnd = onSuccessfulStreamEnd;
/**
* See docs for deserializeCls
*/
exports.deserializeCls = deserializeCls;
/**
* See docs for serializeCls
*/
exports.serializeCls = serializeCls;
/**
* See docs for fullyQualifiedName
*/
exports.fullyQualifiedName = fullyQualifiedName;

@ -1,15 +1,15 @@
syntax = "proto2";
syntax = "proto3";
package math;
message DivArgs {
required int64 dividend = 1;
required int64 divisor = 2;
optional int64 dividend = 1;
optional int64 divisor = 2;
}
message DivReply {
required int64 quotient = 1;
required int64 remainder = 2;
optional int64 quotient = 1;
optional int64 remainder = 2;
}
message FibArgs {
@ -17,9 +17,34 @@ message FibArgs {
}
message Num {
required int64 num = 1;
optional int64 num = 1;
}
message FibReply {
required int64 count = 1;
}
optional int64 count = 1;
}
service Math {
// Div divides args.dividend by args.divisor and returns the quotient and
// remainder.
rpc Div (DivArgs) returns (DivReply) {
}
// DivMany accepts an arbitrary number of division args from the client stream
// and sends back the results in the reply stream. The stream continues until
// the client closes its end; the server does the same after sending all the
// replies. The stream ends immediately if either end aborts.
rpc DivMany (stream DivArgs) returns (stream DivReply) {
}
// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
// generates up to limit numbers; otherwise it continues until the call is
// canceled. Unlike Fib above, Fib has no final FibReply.
rpc Fib (FibArgs) returns (stream Num) {
}
// Sum sums a stream of numbers, returning the final result once the stream
// is closed.
rpc Sum (stream Num) returns (Num) {
}
}

@ -38,77 +38,10 @@ var util = require('util');
var Transform = require('stream').Transform;
var builder = ProtoBuf.loadProtoFile(__dirname + '/math.proto');
var math = builder.build('math');
var grpc = require('..');
var math = grpc.load(__dirname + '/math.proto').math;
var makeConstructor = require('../surface_server.js').makeServerConstructor;
/**
* Get a function that deserializes a specific type of protobuf.
* @param {function()} cls The constructor of the message type to deserialize
* @return {function(Buffer):cls} The deserialization function
*/
function deserializeCls(cls) {
/**
* Deserialize a buffer to a message object
* @param {Buffer} arg_buf The buffer to deserialize
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
return cls.decode(arg_buf);
};
}
/**
* Get a function that serializes objects to a buffer by protobuf class.
* @param {function()} Cls The constructor of the message type to serialize
* @return {function(Cls):Buffer} The serialization function
*/
function serializeCls(Cls) {
/**
* Serialize an object to a Buffer
* @param {Object} arg The object to serialize
* @return {Buffer} The serialized object
*/
return function serialize(arg) {
return new Buffer(new Cls(arg).encode().toBuffer());
};
}
/* This function call creates a server constructor for servers that that expose
* the four specified methods. This specifies how to serialize messages that the
* server sends and deserialize messages that the client sends, and whether the
* client or the server will send a stream of messages, for each method. This
* also specifies a prefix that will be added to method names when sending them
* on the wire. This function call and all of the preceding code in this file
* are intended to approximate what the generated code will look like for the
* math service */
var Server = makeConstructor({
Div: {
serialize: serializeCls(math.DivReply),
deserialize: deserializeCls(math.DivArgs),
client_stream: false,
server_stream: false
},
Fib: {
serialize: serializeCls(math.Num),
deserialize: deserializeCls(math.FibArgs),
client_stream: false,
server_stream: true
},
Sum: {
serialize: serializeCls(math.Num),
deserialize: deserializeCls(math.Num),
client_stream: true,
server_stream: false
},
DivMany: {
serialize: serializeCls(math.DivReply),
deserialize: deserializeCls(math.DivArgs),
client_stream: true,
server_stream: true
}
}, '/Math/');
var Server = grpc.buildServer([math.Math.service]);
/**
* Server function for division. Provides the /Math/DivMany and /Math/Div
@ -185,10 +118,12 @@ function mathDivMany(stream) {
}
var server = new Server({
Div: mathDiv,
Fib: mathFib,
Sum: mathSum,
DivMany: mathDivMany
'math.Math' : {
Div: mathDiv,
Fib: mathFib,
Sum: mathSum,
DivMany: mathDivMany
}
});
if (require.main === module) {

@ -0,0 +1,98 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
var _ = require('underscore');
var ProtoBuf = require('protobufjs');
var surface_client = require('./surface_client.js');
var surface_server = require('./surface_server.js');
var grpc = require('bindings')('grpc');
/**
* Load a gRPC object from an existing ProtoBuf.Reflect object.
* @param {ProtoBuf.Reflect.Namespace} value The ProtoBuf object to load.
* @return {Object<string, *>} The resulting gRPC object
*/
function loadObject(value) {
var result = {};
if (value.className === 'Namespace') {
_.each(value.children, function(child) {
result[child.name] = loadObject(child);
});
return result;
} else if (value.className === 'Service') {
return surface_client.makeClientConstructor(value);
} else if (value.className === 'Service.Message') {
return value.build();
} else {
return value;
}
}
/**
* Load a gRPC object from a .proto file.
* @param {string} filename The file to load
* @return {Object<string, *>} The resulting gRPC object
*/
function load(filename) {
var builder = ProtoBuf.loadProtoFile(filename);
return loadObject(builder.ns);
}
/**
* See docs for loadObject
*/
exports.loadObject = loadObject;
/**
* See docs for load
*/
exports.load = load;
/**
* See docs for surface_server.makeServerConstructor
*/
exports.buildServer = surface_server.makeServerConstructor;
/**
* Status name to code number mapping
*/
exports.status = grpc.status;
/**
* Call error name to code number mapping
*/
exports.callError = grpc.callError;

@ -8,11 +8,12 @@
"dependencies": {
"bindings": "^1.2.1",
"nan": "~1.3.0",
"underscore": "^1.7.0"
"underscore": "^1.7.0",
"protobufjs": "murgatroid99/ProtoBuf.js"
},
"devDependencies": {
"mocha": "~1.21.0",
"highland": "~2.0.0",
"protobufjs": "~3.8.0"
}
"highland": "~2.0.0"
},
"main": "main.js"
}

@ -31,6 +31,8 @@
*
*/
var _ = require('underscore');
var grpc = require('bindings')('grpc.node');
var common = require('./common');
@ -176,6 +178,10 @@ function Server(options) {
* @this Server
*/
this.start = function() {
console.log('Server starting');
_.each(handlers, function(handler, handler_name) {
console.log('Serving', handler_name);
});
if (this.started) {
throw 'Server is already running';
}

@ -35,6 +35,8 @@ var _ = require('underscore');
var client = require('./client.js');
var common = require('./common.js');
var EventEmitter = require('events').EventEmitter;
var stream = require('stream');
@ -44,6 +46,7 @@ var Writable = stream.Writable;
var Duplex = stream.Duplex;
var util = require('util');
function forwardEvent(fromEmitter, toEmitter, event) {
fromEmitter.on(event, function forward() {
_.partial(toEmitter.emit, event).apply(toEmitter, arguments);
@ -317,16 +320,13 @@ var requester_makers = {
}
/**
* Creates a constructor for clients with a service defined by the methods
* object. The methods object has string keys and values of this form:
* {serialize: function, deserialize: function, client_stream: bool,
* server_stream: bool}
* @param {!Object<string, Object>} methods Method descriptor for each method
* the client should expose
* @param {string} prefix The prefix to prepend to each method name
* Creates a constructor for clients for the given service
* @param {ProtoBuf.Reflect.Service} service The service to generate a client
* for
* @return {function(string, Object)} New client constructor
*/
function makeClientConstructor(methods, prefix) {
function makeClientConstructor(service) {
var prefix = '/' + common.fullyQualifiedName(service) + '/';
/**
* Create a client with the given methods
* @constructor
@ -337,27 +337,29 @@ function makeClientConstructor(methods, prefix) {
this.channel = new client.Channel(address, options);
}
_.each(methods, function(method, name) {
_.each(service.children, function(method) {
var method_type;
if (method.client_stream) {
if (method.server_stream) {
if (method.requestStream) {
if (method.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
if (method.server_stream) {
if (method.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
SurfaceClient.prototype[name] = requester_makers[method_type](
prefix + name,
method.serialize,
method.deserialize);
SurfaceClient.prototype[method.name] = requester_makers[method_type](
prefix + method.name,
common.serializeCls(method.resolvedRequestType.build()),
common.deserializeCls(method.resolvedResponseType.build()));
});
SurfaceClient.service = service;
return SurfaceClient;
}

@ -42,6 +42,8 @@ var Writable = stream.Writable;
var Duplex = stream.Duplex;
var util = require('util');
var common = require('./common.js');
util.inherits(ServerReadableObjectStream, Readable);
/**
@ -287,36 +289,59 @@ var handler_makers = {
* @param {string} prefix The prefex to prepend to each method name
* @return {function(Object, Object)} New server constructor
*/
function makeServerConstructor(methods, prefix) {
function makeServerConstructor(services) {
var qual_names = [];
_.each(services, function(service) {
_.each(service.children, function(method) {
var name = common.fullyQualifiedName(method);
if (_.indexOf(qual_names, name) !== -1) {
throw new Error('Method ' + name + ' exposed by more than one service');
}
qual_names.push(name);
});
});
/**
* Create a server with the given handlers for all of the methods.
* @constructor
* @param {Object} handlers Map from method names to method handlers.
* @param {Object} service_handlers Map from service names to map from method
* names to handlers
* @param {Object} options Options to pass to the underlying server
*/
function SurfaceServer(handlers, options) {
function SurfaceServer(service_handlers, options) {
var server = new Server(options);
this.inner_server = server;
_.each(handlers, function(handler, name) {
var method = methods[name];
var method_type;
if (method.client_stream) {
if (method.server_stream) {
method_type = 'bidi';
_.each(services, function(service) {
var service_name = common.fullyQualifiedName(service);
if (service_handlers[service_name] === undefined) {
throw new Error('Handlers for service ' +
service_name + ' not provided.');
}
var prefix = '/' + common.fullyQualifiedName(service) + '/';
_.each(service.children, function(method) {
var method_type;
if (method.requestStream) {
if (method.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
method_type = 'client_stream';
if (method.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
} else {
if (method.server_stream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
if (service_handlers[service_name][method.name] === undefined) {
throw new Error('Method handler for ' +
common.fullyQualifiedName(method) + ' not provided.');
}
}
var binary_handler = handler_makers[method_type](handler,
method.serialize,
method.deserialize);
server.register('' + prefix + name, binary_handler);
var binary_handler = handler_makers[method_type](
service_handlers[service_name][method.name],
common.serializeCls(method.resolvedResponseType.build()),
common.deserializeCls(method.resolvedRequestType.build()));
server.register(prefix + method.name, binary_handler);
});
}, this);
}

@ -32,83 +32,13 @@
*/
var assert = require('assert');
var ProtoBuf = require('protobufjs');
var port_picker = require('../port_picker');
var builder = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto');
var math = builder.build('math');
var grpc = require('..');
var math = grpc.load(__dirname + '/../examples/math.proto').math;
var client = require('../surface_client.js');
var makeConstructor = client.makeClientConstructor;
/**
* Get a function that deserializes a specific type of protobuf.
* @param {function()} cls The constructor of the message type to deserialize
* @return {function(Buffer):cls} The deserialization function
*/
function deserializeCls(cls) {
/**
* Deserialize a buffer to a message object
* @param {Buffer} arg_buf The buffer to deserialize
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
return cls.decode(arg_buf);
};
}
/**
* Get a function that serializes objects to a buffer by protobuf class.
* @param {function()} Cls The constructor of the message type to serialize
* @return {function(Cls):Buffer} The serialization function
*/
function serializeCls(Cls) {
/**
* Serialize an object to a Buffer
* @param {Object} arg The object to serialize
* @return {Buffer} The serialized object
*/
return function serialize(arg) {
return new Buffer(new Cls(arg).encode().toBuffer());
};
}
/* This function call creates a client constructor for clients that expose the
* four specified methods. This specifies how to serialize messages that the
* client sends and deserialize messages that the server sends, and whether the
* client or the server will send a stream of messages, for each method. This
* also specifies a prefix that will be added to method names when sending them
* on the wire. This function call and all of the preceding code in this file
* are intended to approximate what the generated code will look like for the
* math client */
var MathClient = makeConstructor({
Div: {
serialize: serializeCls(math.DivArgs),
deserialize: deserializeCls(math.DivReply),
client_stream: false,
server_stream: false
},
Fib: {
serialize: serializeCls(math.FibArgs),
deserialize: deserializeCls(math.Num),
client_stream: false,
server_stream: true
},
Sum: {
serialize: serializeCls(math.Num),
deserialize: deserializeCls(math.Num),
client_stream: true,
server_stream: false
},
DivMany: {
serialize: serializeCls(math.DivArgs),
deserialize: deserializeCls(math.DivReply),
client_stream: true,
server_stream: true
}
}, '/Math/');
/**
* Channel to use to make requests to a running server.
* Client to use to make requests to a running server.
*/
var math_client;
@ -122,7 +52,7 @@ describe('Math client', function() {
before(function(done) {
port_picker.nextAvailablePort(function(port) {
server.bind(port).listen();
math_client = new MathClient(port);
math_client = new math.Math(port);
done();
});
});
@ -137,7 +67,7 @@ describe('Math client', function() {
assert.equal(value.remainder, 3);
});
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
assert.strictEqual(status.code, grpc.status.OK);
done();
});
});
@ -150,7 +80,7 @@ describe('Math client', function() {
next_expected += 1;
});
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
assert.strictEqual(status.code, grpc.status.OK);
done();
});
});
@ -164,7 +94,7 @@ describe('Math client', function() {
}
call.end();
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
assert.strictEqual(status.code, grpc.status.OK);
done();
});
});
@ -184,7 +114,7 @@ describe('Math client', function() {
}
call.end();
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
assert.strictEqual(status.code, grpc.status.OK);
done();
});
});

@ -0,0 +1,75 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
var assert = require('assert');
var surface_server = require('../surface_server.js');
var ProtoBuf = require('protobufjs');
var grpc = require('..');
var math_proto = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto');
var mathService = math_proto.lookup('math.Math');
describe('Surface server constructor', function() {
it('Should fail with conflicting method names', function() {
assert.throws(function() {
grpc.buildServer([mathService, mathService]);
});
});
it('Should succeed with a single service', function() {
assert.doesNotThrow(function() {
grpc.buildServer([mathService]);
});
});
it('Should fail with missing handlers', function() {
var Server = grpc.buildServer([mathService]);
assert.throws(function() {
new Server({
'math.Math': {
'Div': function() {},
'DivMany': function() {},
'Fib': function() {}
}
});
}, /math.Math.Sum/);
});
it('Should fail with no handlers for the service', function() {
var Server = grpc.buildServer([mathService]);
assert.throws(function() {
new Server({});
}, /math.Math/);
});
});

@ -154,8 +154,9 @@ void test_connect(const char *server_host, const char *client_host, int port,
/* Check for a failed connection. */
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_ERROR);
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_CANCELLED,
NULL, NULL);
cq_expect_finished_with_status(v_client, tag(3),
GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded", NULL);
cq_verify(v_client);
grpc_call_destroy(c);

@ -55,7 +55,8 @@ def main():
'name': 'end2end_test_%s' % t,
'build': 'private',
'secure': False,
'src': ['test/core/end2end/tests/%s.c' % t]
'src': ['test/core/end2end/tests/%s.c' % t],
'headers': ['test/core/end2end/tests/cancel_test_helpers.h']
}
for t in END2END_TESTS] + [
{

@ -62,8 +62,8 @@ int main(int argc, char **argv) {
/* verify that all tags get completed */
cq_expect_invoke_accepted(cqv, tag(1), GRPC_OP_ERROR);
cq_expect_client_metadata_read(cqv, tag(2), NULL);
cq_expect_finished_with_status(cqv, tag(3), GRPC_STATUS_CANCELLED, NULL,
NULL);
cq_expect_finished_with_status(cqv, tag(3), GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded", NULL);
cq_verify(cqv);
grpc_completion_queue_shutdown(cq);

@ -43,14 +43,7 @@
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "test/core/end2end/cq_verifier.h"
/* allow cancellation by either grpc_call_cancel, or by wait_for_deadline (which
* does nothing) */
typedef grpc_call_error (*canceller)(grpc_call *call);
static grpc_call_error wait_for_deadline(grpc_call *call) {
return GRPC_CALL_OK;
}
#include "test/core/end2end/tests/cancel_test_helpers.h"
enum { TIMEOUT = 200000 };
@ -112,7 +105,7 @@ static void end_test(grpc_end2end_test_fixture *f) {
/* Cancel after accept, no payload */
static void test_cancel_after_accept(grpc_end2end_test_config config,
canceller call_cancel) {
cancellation_mode mode) {
grpc_call *c;
grpc_call *s;
grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL);
@ -138,10 +131,10 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == call_cancel(c));
GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c));
cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_CANCELLED, NULL,
NULL);
cq_expect_finished_with_status(v_client, tag(3), mode.expect_status,
mode.expect_details, NULL);
cq_verify(v_client);
cq_expect_finished_with_status(v_server, tag(102), GRPC_STATUS_CANCELLED,
@ -159,9 +152,8 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
void grpc_end2end_tests(grpc_end2end_test_config config) {
int i;
canceller cancellers[2] = {grpc_call_cancel, wait_for_deadline};
for (i = 0; i < GPR_ARRAY_SIZE(cancellers); i++) {
test_cancel_after_accept(config, cancellers[i]);
for (i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); i++) {
test_cancel_after_accept(config, cancellation_modes[i]);
}
}

@ -43,14 +43,7 @@
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "test/core/end2end/cq_verifier.h"
/* allow cancellation by either grpc_call_cancel, or by wait_for_deadline (which
* does nothing) */
typedef grpc_call_error (*canceller)(grpc_call *call);
static grpc_call_error wait_for_deadline(grpc_call *call) {
return GRPC_CALL_OK;
}
#include "test/core/end2end/tests/cancel_test_helpers.h"
enum { TIMEOUT = 200000 };
@ -112,7 +105,7 @@ static void end_test(grpc_end2end_test_fixture *f) {
/* Cancel after accept with a writes closed, no payload */
static void test_cancel_after_accept_and_writes_closed(
grpc_end2end_test_config config, canceller call_cancel) {
grpc_end2end_test_config config, cancellation_mode mode) {
grpc_call *c;
grpc_call *s;
grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL);
@ -146,10 +139,10 @@ static void test_cancel_after_accept_and_writes_closed(
cq_expect_empty_read(v_server, tag(101));
cq_verify(v_server);
GPR_ASSERT(GRPC_CALL_OK == call_cancel(c));
GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c));
cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_CANCELLED, NULL,
NULL);
cq_expect_finished_with_status(v_client, tag(3), mode.expect_status,
mode.expect_details, NULL);
cq_verify(v_client);
cq_expect_finished_with_status(v_server, tag(102), GRPC_STATUS_CANCELLED,
@ -167,9 +160,8 @@ static void test_cancel_after_accept_and_writes_closed(
void grpc_end2end_tests(grpc_end2end_test_config config) {
int i;
canceller cancellers[2] = {grpc_call_cancel, wait_for_deadline};
for (i = 0; i < GPR_ARRAY_SIZE(cancellers); i++) {
test_cancel_after_accept_and_writes_closed(config, cancellers[i]);
for (i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); i++) {
test_cancel_after_accept_and_writes_closed(config, cancellation_modes[i]);
}
}

@ -43,14 +43,7 @@
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "test/core/end2end/cq_verifier.h"
/* allow cancellation by either grpc_call_cancel, or by wait_for_deadline (which
* does nothing) */
typedef grpc_call_error (*canceller)(grpc_call *call);
static grpc_call_error wait_for_deadline(grpc_call *call) {
return GRPC_CALL_OK;
}
#include "test/core/end2end/tests/cancel_test_helpers.h"
enum { TIMEOUT = 200000 };
@ -112,7 +105,7 @@ static void end_test(grpc_end2end_test_fixture *f) {
/* Cancel after invoke, no payload */
static void test_cancel_after_invoke(grpc_end2end_test_config config,
canceller call_cancel) {
cancellation_mode mode) {
grpc_call *c;
grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL);
gpr_timespec deadline = five_seconds_time();
@ -126,11 +119,11 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config,
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == call_cancel(c));
GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c));
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_CANCELLED, NULL,
NULL);
cq_expect_finished_with_status(v_client, tag(3), mode.expect_status,
mode.expect_details, NULL);
cq_verify(v_client);
grpc_call_destroy(c);
@ -142,9 +135,8 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config,
void grpc_end2end_tests(grpc_end2end_test_config config) {
int i;
canceller cancellers[2] = {grpc_call_cancel, wait_for_deadline};
for (i = 0; i < GPR_ARRAY_SIZE(cancellers); i++) {
test_cancel_after_invoke(config, cancellers[i]);
for (i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); i++) {
test_cancel_after_invoke(config, cancellation_modes[i]);
}
}

@ -44,10 +44,6 @@
#include <grpc/support/useful.h>
#include "test/core/end2end/cq_verifier.h"
/* allow cancellation by either grpc_call_cancel, or by wait_for_deadline (which
* does nothing) */
typedef grpc_call_error (*canceller)(grpc_call *call);
enum { TIMEOUT = 200000 };
static void *tag(gpr_intptr t) { return (void *)t; }

@ -43,14 +43,7 @@
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "test/core/end2end/cq_verifier.h"
/* allow cancellation by either grpc_call_cancel, or by wait_for_deadline (which
* does nothing) */
typedef grpc_call_error (*canceller)(grpc_call *call);
static grpc_call_error wait_for_deadline(grpc_call *call) {
return GRPC_CALL_OK;
}
#include "test/core/end2end/tests/cancel_test_helpers.h"
enum { TIMEOUT = 200000 };
@ -110,7 +103,7 @@ static void end_test(grpc_end2end_test_fixture *f) {
/* Cancel and do nothing */
static void test_cancel_in_a_vacuum(grpc_end2end_test_config config,
canceller call_cancel) {
cancellation_mode mode) {
grpc_call *c;
grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL);
gpr_timespec deadline = five_seconds_time();
@ -119,7 +112,7 @@ static void test_cancel_in_a_vacuum(grpc_end2end_test_config config,
c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK == call_cancel(c));
GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c));
grpc_call_destroy(c);
@ -130,9 +123,8 @@ static void test_cancel_in_a_vacuum(grpc_end2end_test_config config,
void grpc_end2end_tests(grpc_end2end_test_config config) {
int i;
canceller cancellers[2] = {grpc_call_cancel, wait_for_deadline};
for (i = 0; i < GPR_ARRAY_SIZE(cancellers); i++) {
test_cancel_in_a_vacuum(config, cancellers[i]);
for (i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); i++) {
test_cancel_in_a_vacuum(config, cancellation_modes[i]);
}
}

@ -0,0 +1,52 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPC_TEST_END2END_TESTS_CANCEL_TEST_HELPERS_H__
#define __GRPC_TEST_END2END_TESTS_CANCEL_TEST_HELPERS_H__
typedef struct {
grpc_call_error (*initiate_cancel)(grpc_call *call);
grpc_status_code expect_status;
const char *expect_details;
} cancellation_mode;
static grpc_call_error wait_for_deadline(grpc_call *call) {
return GRPC_CALL_OK;
}
static const cancellation_mode cancellation_modes[] = {
{grpc_call_cancel, GRPC_STATUS_CANCELLED, NULL},
{wait_for_deadline, GRPC_STATUS_DEADLINE_EXCEEDED, "Deadline Exceeded"},
};
#endif

@ -210,9 +210,7 @@ TEST_F(End2endTest, RpcDeadlineExpires) {
std::chrono::system_clock::now() + std::chrono::microseconds(10);
context.set_absolute_deadline(deadline);
Status s = stub_->Echo(&context, request, &response);
// TODO(yangg) use correct error code when b/18793983 is fixed.
// EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.code());
EXPECT_EQ(StatusCode::CANCELLED, s.code());
EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.code());
}
// Set a long but finite deadline.

Loading…
Cancel
Save