Merge pull request #69 from murgatroid99/node_new_invoke_api2

Updated to new call.invoke API
pull/81/head^2
Craig Tiller 10 years ago
commit d7c16868a5
  1. 31
      src/node/call.cc
  2. 2
      src/node/call.h
  3. 99
      src/node/client.js
  4. 2
      src/node/node_grpc.cc
  5. 42
      src/node/test/call_test.js
  6. 1
      src/node/test/constant_test.js
  7. 57
      src/node/test/end_to_end_test.js
  8. 41
      src/node/test/server_test.js

@ -78,8 +78,8 @@ void Call::Init(Handle<Object> exports) {
tpl->InstanceTemplate()->SetInternalFieldCount(1);
NanSetPrototypeTemplate(tpl, "addMetadata",
FunctionTemplate::New(AddMetadata)->GetFunction());
NanSetPrototypeTemplate(tpl, "startInvoke",
FunctionTemplate::New(StartInvoke)->GetFunction());
NanSetPrototypeTemplate(tpl, "invoke",
FunctionTemplate::New(Invoke)->GetFunction());
NanSetPrototypeTemplate(tpl, "serverAccept",
FunctionTemplate::New(ServerAccept)->GetFunction());
NanSetPrototypeTemplate(
@ -203,37 +203,30 @@ NAN_METHOD(Call::AddMetadata) {
NanReturnUndefined();
}
NAN_METHOD(Call::StartInvoke) {
NAN_METHOD(Call::Invoke) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("startInvoke can only be called on Call objects");
return NanThrowTypeError("invoke can only be called on Call objects");
}
if (!args[0]->IsFunction()) {
return NanThrowTypeError("StartInvoke's first argument must be a function");
return NanThrowTypeError("invoke's first argument must be a function");
}
if (!args[1]->IsFunction()) {
return NanThrowTypeError(
"StartInvoke's second argument must be a function");
}
if (!args[2]->IsFunction()) {
return NanThrowTypeError("StartInvoke's third argument must be a function");
return NanThrowTypeError("invoke's second argument must be a function");
}
if (!args[3]->IsUint32()) {
return NanThrowTypeError(
"StartInvoke's fourth argument must be integer flags");
if (!args[2]->IsUint32()) {
return NanThrowTypeError("invoke's third argument must be integer flags");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
unsigned int flags = args[3]->Uint32Value();
grpc_call_error error = grpc_call_start_invoke(
grpc_call_error error = grpc_call_invoke(
call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(),
CreateTag(args[0], args.This()), CreateTag(args[1], args.This()),
CreateTag(args[2], args.This()), flags);
CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags);
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
CompletionQueueAsyncWorker::Next();
CompletionQueueAsyncWorker::Next();
} else {
return NanThrowError("startInvoke failed", error);
return NanThrowError("invoke failed", error);
}
NanReturnUndefined();
}
@ -281,7 +274,7 @@ NAN_METHOD(Call::ServerEndInitialMetadata) {
NAN_METHOD(Call::Cancel) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("startInvoke can only be called on Call objects");
return NanThrowTypeError("cancel can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_cancel(call->wrapped_call);

@ -61,7 +61,7 @@ class Call : public ::node::ObjectWrap {
static NAN_METHOD(New);
static NAN_METHOD(AddMetadata);
static NAN_METHOD(StartInvoke);
static NAN_METHOD(Invoke);
static NAN_METHOD(ServerAccept);
static NAN_METHOD(ServerEndInitialMetadata);
static NAN_METHOD(Cancel);

@ -50,101 +50,53 @@ util.inherits(GrpcClientStream, Duplex);
function GrpcClientStream(call, options) {
Duplex.call(this, options);
var self = this;
// Indicates that we can start reading and have not received a null read
var can_read = false;
var finished = false;
// Indicates that a read is currently pending
var reading = false;
// Indicates that we can call startWrite
var can_write = false;
// Indicates that a write is currently pending
var writing = false;
this._call = call;
/**
* Callback to handle receiving a READ event. Pushes the data from that event
* onto the read queue and starts reading again if applicable.
* @param {grpc.Event} event The READ event object
* Callback to be called when a READ event is received. Pushes the data onto
* the read queue and starts reading again if applicable
* @param {grpc.Event} event READ event object
*/
function readCallback(event) {
if (finished) {
self.push(null);
return;
}
var data = event.data;
if (self.push(data)) {
if (data == null) {
// Disable starting to read after null read was received
can_read = false;
reading = false;
} else {
call.startRead(readCallback);
}
if (self.push(data) && data != null) {
self._call.startRead(readCallback);
} else {
// Indicate that reading can be resumed by calling startReading
reading = false;
}
};
/**
* Initiate a read, which continues until self.push returns false (indicating
* that reading should be paused) or data is null (indicating that there is no
* more data to read).
*/
function startReading() {
call.startRead(readCallback);
}
// TODO(mlumish): possibly change queue implementation due to shift slowness
var write_queue = [];
/**
* Write the next chunk of data in the write queue if there is one. Otherwise
* indicate that there is no pending write. When the write succeeds, this
* function is called again.
*/
function writeNext() {
if (write_queue.length > 0) {
writing = true;
var next = write_queue.shift();
var writeCallback = function(event) {
next.callback();
writeNext();
};
call.startWrite(next.chunk, writeCallback, 0);
} else {
writing = false;
}
}
call.startInvoke(function(event) {
can_read = true;
can_write = true;
startReading();
writeNext();
}, function(event) {
call.invoke(function(event) {
self.emit('metadata', event.data);
}, function(event) {
finished = true;
self.emit('status', event.data);
}, 0);
this.on('finish', function() {
call.writesDone(function() {});
});
/**
* Indicate that reads should start, and start them if the INVOKE_ACCEPTED
* event has been received.
* Start reading if there is not already a pending read. Reading will
* continue until self.push returns false (indicating reads should slow
* down) or the read data is null (indicating that there is no more data).
*/
this._enableRead = function() {
if (!reading) {
reading = true;
if (can_read) {
startReading();
this.startReading = function() {
if (finished) {
self.push(null);
} else {
if (!reading) {
reading = true;
self._call.startRead(readCallback);
}
}
};
/**
* Push the chunk onto the write queue, and write from the write queue if
* there is not a pending write
* @param {Buffer} chunk The chunk of data to write
* @param {function(Error=)} callback The callback to call when the write
* completes
*/
this._tryWrite = function(chunk, callback) {
write_queue.push({chunk: chunk, callback: callback});
if (can_write && !writing) {
writeNext();
}
};
}
/**
@ -153,7 +105,7 @@ function GrpcClientStream(call, options) {
* @param {number} size Ignored
*/
GrpcClientStream.prototype._read = function(size) {
this._enableRead();
this.startReading();
};
/**
@ -164,7 +116,10 @@ GrpcClientStream.prototype._read = function(size) {
* @param {function(Error=)} callback Ignored
*/
GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
this._tryWrite(chunk, callback);
var self = this;
self._call.startWrite(chunk, function(event) {
callback();
}, 0);
};
/**

@ -148,8 +148,6 @@ void InitCompletionTypeConstants(Handle<Object> exports) {
completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN);
Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ));
completion_type->Set(NanNew("READ"), READ);
Handle<Value> INVOKE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_INVOKE_ACCEPTED));
completion_type->Set(NanNew("INVOKE_ACCEPTED"), INVOKE_ACCEPTED);
Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED));
completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED);
Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED));

@ -118,12 +118,11 @@ describe('call', function() {
call.addMetadata(5);
}, TypeError);
});
it('should fail if startInvoke was already called', function(done) {
it('should fail if invoke was already called', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
call.startInvoke(function() {},
function() {},
function() {done();},
0);
call.invoke(function() {},
function() {done();},
0);
assert.throws(function() {
call.addMetadata({'key' : 'key', 'value' : new Buffer('value') });
}, function(err) {
@ -133,32 +132,26 @@ describe('call', function() {
call.cancel();
});
});
describe('startInvoke', function() {
it('should fail with fewer than 4 arguments', function() {
describe('invoke', function() {
it('should fail with fewer than 3 arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
call.startInvoke();
call.invoke();
}, TypeError);
assert.throws(function() {
call.startInvoke(function() {});
call.invoke(function() {});
}, TypeError);
assert.throws(function() {
call.startInvoke(function() {},
function() {});
}, TypeError);
assert.throws(function() {
call.startInvoke(function() {},
function() {},
function() {});
call.invoke(function() {},
function() {});
}, TypeError);
});
it('should work with 3 args and an int', function(done) {
it('should work with 2 args and an int', function(done) {
assert.doesNotThrow(function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
call.startInvoke(function() {},
function() {},
function() {done();},
0);
call.invoke(function() {},
function() {done();},
0);
// Cancel to speed up the test
call.cancel();
});
@ -166,12 +159,11 @@ describe('call', function() {
it('should reject incorrectly typed arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
call.startInvoke(0, 0, 0, 0);
call.invoke(0, 0, 0);
}, TypeError);
assert.throws(function() {
call.startInvoke(function() {},
function() {},
function() {}, 'test');
call.invoke(function() {},
function() {}, 'test');
});
});
});

@ -94,7 +94,6 @@ var opErrorNames = [
var completionTypeNames = [
'QUEUE_SHUTDOWN',
'READ',
'INVOKE_ACCEPTED',
'WRITE_ACCEPTED',
'FINISH_ACCEPTED',
'CLIENT_METADATA_READ',

@ -72,16 +72,7 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
call.startInvoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.INVOKE_ACCEPTED);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
});
},function(event) {
call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
},function(event) {
@ -91,6 +82,11 @@ describe('end-to-end', function() {
assert.strictEqual(status.details, status_text);
done();
}, 0);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
});
server.start();
server.requestCall(function(event) {
@ -131,28 +127,7 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
call.startInvoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.INVOKE_ACCEPTED);
call.startWrite(
new Buffer(req_text),
function(event) {
assert.strictEqual(event.type,
grpc.completionType.WRITE_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
done();
});
}, 0);
call.startRead(function(event) {
assert.strictEqual(event.type, grpc.completionType.READ);
assert.strictEqual(event.data.toString(), reply_text);
done();
});
},function(event) {
call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
@ -163,6 +138,24 @@ describe('end-to-end', function() {
assert.strictEqual(status.details, status_text);
done();
}, 0);
call.startWrite(
new Buffer(req_text),
function(event) {
assert.strictEqual(event.type,
grpc.completionType.WRITE_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
done();
});
}, 0);
call.startRead(function(event) {
assert.strictEqual(event.type, grpc.completionType.READ);
assert.strictEqual(event.data.toString(), reply_text);
done();
});
server.start();
server.requestCall(function(event) {

@ -83,28 +83,7 @@ describe('echo server', function() {
var call = new grpc.Call(channel,
'echo',
deadline);
call.startInvoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.INVOKE_ACCEPTED);
call.startWrite(
new Buffer(req_text),
function(event) {
assert.strictEqual(event.type,
grpc.completionType.WRITE_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
done();
});
}, 0);
call.startRead(function(event) {
assert.strictEqual(event.type, grpc.completionType.READ);
assert.strictEqual(event.data.toString(), req_text);
done();
});
},function(event) {
call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
@ -116,6 +95,24 @@ describe('echo server', function() {
server.shutdown();
done();
}, 0);
call.startWrite(
new Buffer(req_text),
function(event) {
assert.strictEqual(event.type,
grpc.completionType.WRITE_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
done();
});
}, 0);
call.startRead(function(event) {
assert.strictEqual(event.type, grpc.completionType.READ);
assert.strictEqual(event.data.toString(), req_text);
done();
});
});
});
});

Loading…
Cancel
Save