|
|
|
@ -100,28 +100,6 @@ function handleError(call, error) { |
|
|
|
|
call.startBatch(error_batch, function(){}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Wait for the client to close, then emit a cancelled event if the client |
|
|
|
|
* cancelled. |
|
|
|
|
* @access private |
|
|
|
|
* @param {grpc.Call} call The call object to wait on |
|
|
|
|
* @param {EventEmitter} emitter The event emitter to emit the cancelled event |
|
|
|
|
* on |
|
|
|
|
*/ |
|
|
|
|
function waitForCancel(call, emitter) { |
|
|
|
|
var cancel_batch = {}; |
|
|
|
|
cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; |
|
|
|
|
call.startBatch(cancel_batch, function(err, result) { |
|
|
|
|
if (err) { |
|
|
|
|
emitter.emit('error', err); |
|
|
|
|
} |
|
|
|
|
if (result.cancelled) { |
|
|
|
|
emitter.cancelled = true; |
|
|
|
|
emitter.emit('cancelled'); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Send a response to a unary or client streaming call. |
|
|
|
|
* @access private |
|
|
|
@ -258,6 +236,13 @@ function setUpReadable(stream, deserialize) { |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
util.inherits(ServerUnaryCall, EventEmitter); |
|
|
|
|
|
|
|
|
|
function ServerUnaryCall(call) { |
|
|
|
|
EventEmitter.call(this); |
|
|
|
|
this.call = call; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
util.inherits(ServerWritableStream, Writable); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -311,33 +296,6 @@ function _write(chunk, encoding, callback) { |
|
|
|
|
|
|
|
|
|
ServerWritableStream.prototype._write = _write; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Send the initial metadata for a writable stream. |
|
|
|
|
* @param {Metadata} responseMetadata Metadata to send |
|
|
|
|
*/ |
|
|
|
|
function sendMetadata(responseMetadata) { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
|
var self = this; |
|
|
|
|
if (!this.call.metadataSent) { |
|
|
|
|
this.call.metadataSent = true; |
|
|
|
|
var batch = []; |
|
|
|
|
batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
responseMetadata._getCoreRepresentation(); |
|
|
|
|
this.call.startBatch(batch, function(err) { |
|
|
|
|
if (err) { |
|
|
|
|
self.emit('error', err); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @inheritdoc |
|
|
|
|
* @alias module:src/server~ServerWritableStream#sendMetadata |
|
|
|
|
*/ |
|
|
|
|
ServerWritableStream.prototype.sendMetadata = sendMetadata; |
|
|
|
|
|
|
|
|
|
util.inherits(ServerReadableStream, Readable); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -427,6 +385,31 @@ function ServerDuplexStream(call, serialize, deserialize) { |
|
|
|
|
|
|
|
|
|
ServerDuplexStream.prototype._read = _read; |
|
|
|
|
ServerDuplexStream.prototype._write = _write; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Send the initial metadata for a writable stream. |
|
|
|
|
* @param {Metadata} responseMetadata Metadata to send |
|
|
|
|
*/ |
|
|
|
|
function sendMetadata(responseMetadata) { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
|
var self = this; |
|
|
|
|
if (!this.call.metadataSent) { |
|
|
|
|
this.call.metadataSent = true; |
|
|
|
|
var batch = {}; |
|
|
|
|
batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
responseMetadata._getCoreRepresentation(); |
|
|
|
|
this.call.startBatch(batch, function(err) { |
|
|
|
|
if (err) { |
|
|
|
|
self.emit('error', err); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ServerUnaryCall.prototype.sendMetadata = sendMetadata; |
|
|
|
|
ServerWritableStream.prototype.sendMetadata = sendMetadata; |
|
|
|
|
ServerReadableStream.prototype.sendMetadata = sendMetadata; |
|
|
|
|
ServerDuplexStream.prototype.sendMetadata = sendMetadata; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -438,10 +421,36 @@ function getPeer() { |
|
|
|
|
return this.call.getPeer(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ServerUnaryCall.prototype.getPeer = getPeer; |
|
|
|
|
ServerReadableStream.prototype.getPeer = getPeer; |
|
|
|
|
ServerWritableStream.prototype.getPeer = getPeer; |
|
|
|
|
ServerDuplexStream.prototype.getPeer = getPeer; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Wait for the client to close, then emit a cancelled event if the client |
|
|
|
|
* cancelled. |
|
|
|
|
*/ |
|
|
|
|
function waitForCancel() { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
|
var self = this; |
|
|
|
|
var cancel_batch = {}; |
|
|
|
|
cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; |
|
|
|
|
self.call.startBatch(cancel_batch, function(err, result) { |
|
|
|
|
if (err) { |
|
|
|
|
self.emit('error', err); |
|
|
|
|
} |
|
|
|
|
if (result.cancelled) { |
|
|
|
|
self.cancelled = true; |
|
|
|
|
self.emit('cancelled'); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ServerUnaryCall.prototype.waitForCancel = waitForCancel; |
|
|
|
|
ServerReadableStream.prototype.waitForCancel = waitForCancel; |
|
|
|
|
ServerWritableStream.prototype.waitForCancel = waitForCancel; |
|
|
|
|
ServerDuplexStream.prototype.waitForCancel = waitForCancel; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Fully handle a unary call |
|
|
|
|
* @access private |
|
|
|
@ -450,25 +459,12 @@ ServerDuplexStream.prototype.getPeer = getPeer; |
|
|
|
|
* @param {Metadata} metadata Metadata from the client |
|
|
|
|
*/ |
|
|
|
|
function handleUnary(call, handler, metadata) { |
|
|
|
|
var emitter = new EventEmitter(); |
|
|
|
|
emitter.sendMetadata = function(responseMetadata) { |
|
|
|
|
if (!call.metadataSent) { |
|
|
|
|
call.metadataSent = true; |
|
|
|
|
var batch = {}; |
|
|
|
|
batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
responseMetadata._getCoreRepresentation(); |
|
|
|
|
call.startBatch(batch, function() {}); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
emitter.getPeer = function() { |
|
|
|
|
return call.getPeer(); |
|
|
|
|
}; |
|
|
|
|
var emitter = new ServerUnaryCall(call); |
|
|
|
|
emitter.on('error', function(error) { |
|
|
|
|
handleError(call, error); |
|
|
|
|
}); |
|
|
|
|
emitter.metadata = metadata; |
|
|
|
|
waitForCancel(call, emitter); |
|
|
|
|
emitter.call = call; |
|
|
|
|
emitter.waitForCancel(); |
|
|
|
|
var batch = {}; |
|
|
|
|
batch[grpc.opType.RECV_MESSAGE] = true; |
|
|
|
|
call.startBatch(batch, function(err, result) { |
|
|
|
@ -508,7 +504,7 @@ function handleUnary(call, handler, metadata) { |
|
|
|
|
*/ |
|
|
|
|
function handleServerStreaming(call, handler, metadata) { |
|
|
|
|
var stream = new ServerWritableStream(call, handler.serialize); |
|
|
|
|
waitForCancel(call, stream); |
|
|
|
|
stream.waitForCancel(); |
|
|
|
|
stream.metadata = metadata; |
|
|
|
|
var batch = {}; |
|
|
|
|
batch[grpc.opType.RECV_MESSAGE] = true; |
|
|
|
@ -537,19 +533,10 @@ function handleServerStreaming(call, handler, metadata) { |
|
|
|
|
*/ |
|
|
|
|
function handleClientStreaming(call, handler, metadata) { |
|
|
|
|
var stream = new ServerReadableStream(call, handler.deserialize); |
|
|
|
|
stream.sendMetadata = function(responseMetadata) { |
|
|
|
|
if (!call.metadataSent) { |
|
|
|
|
call.metadataSent = true; |
|
|
|
|
var batch = {}; |
|
|
|
|
batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
responseMetadata._getCoreRepresentation(); |
|
|
|
|
call.startBatch(batch, function() {}); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
stream.on('error', function(error) { |
|
|
|
|
handleError(call, error); |
|
|
|
|
}); |
|
|
|
|
waitForCancel(call, stream); |
|
|
|
|
stream.waitForCancel(); |
|
|
|
|
stream.metadata = metadata; |
|
|
|
|
handler.func(stream, function(err, value, trailer, flags) { |
|
|
|
|
stream.terminate(); |
|
|
|
@ -574,7 +561,7 @@ function handleClientStreaming(call, handler, metadata) { |
|
|
|
|
function handleBidiStreaming(call, handler, metadata) { |
|
|
|
|
var stream = new ServerDuplexStream(call, handler.serialize, |
|
|
|
|
handler.deserialize); |
|
|
|
|
waitForCancel(call, stream); |
|
|
|
|
stream.waitForCancel(); |
|
|
|
|
stream.metadata = metadata; |
|
|
|
|
handler.func(stream); |
|
|
|
|
} |
|
|
|
|