The C based gRPC (C++, Python, Ruby, Objective-C, PHP, C#)
https://grpc.io/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
277 lines
8.4 KiB
277 lines
8.4 KiB
// Copyright 2015, Google Inc. |
|
// All rights reserved. |
|
// |
|
// Redistribution and use in source and binary forms, with or without |
|
// modification, are permitted provided that the following conditions are |
|
// met: |
|
// |
|
// * Redistributions of source code must retain the above copyright |
|
// notice, this list of conditions and the following disclaimer. |
|
// * Redistributions in binary form must reproduce the above |
|
// copyright notice, this list of conditions and the following disclaimer |
|
// in the documentation and/or other materials provided with the |
|
// distribution. |
|
// * Neither the name of Google Inc. nor the names of its |
|
// contributors may be used to endorse or promote products derived from |
|
// this software without specific prior written permission. |
|
// |
|
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
|
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
|
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
|
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
|
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
|
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
|
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
|
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
|
|
var async = require('async'); |
|
var fs = require('fs'); |
|
var GoogleAuth = require('googleauth'); |
|
var parseArgs = require('minimist'); |
|
var strftime = require('strftime'); |
|
var _ = require('underscore'); |
|
var grpc = require('../..'); |
|
var PROTO_PATH = __dirname + '/pubsub.proto'; |
|
var pubsub = grpc.load(PROTO_PATH).tech.pubsub; |
|
|
|
function PubsubRunner(pub, sub, args) { |
|
this.pub = pub; |
|
this.sub = sub; |
|
this.args = args; |
|
} |
|
|
|
PubsubRunner.prototype.getTestTopicName = function() { |
|
var base_name = '/topics/' + this.args.project_id + '/'; |
|
if (this.args.topic_name) { |
|
return base_name + this.args.topic_name; |
|
} |
|
var now_text = strftime('%Y%m%d%H%M%S%L'); |
|
return base_name + process.env.USER + '-' + now_text; |
|
}; |
|
|
|
PubsubRunner.prototype.getTestSubName = function() { |
|
var base_name = '/subscriptions/' + this.args.project_id + '/'; |
|
if (this.args.sub_name) { |
|
return base_name + this.args.sub_name; |
|
} |
|
var now_text = strftime('%Y%m%d%H%M%S%L'); |
|
return base_name + process.env.USER + '-' + now_text; |
|
}; |
|
|
|
PubsubRunner.prototype.listProjectTopics = function(callback) { |
|
var q = ('cloud.googleapis.com/project in (/projects/' + |
|
this.args.project_id + ')'); |
|
this.pub.listTopics({query: q}, callback); |
|
}; |
|
|
|
PubsubRunner.prototype.topicExists = function(name, callback) { |
|
this.listProjectTopics(function(err, response) { |
|
if (err) { |
|
callback(err); |
|
} else { |
|
callback(null, _.some(response.topic, function(t) { |
|
return t.name === name; |
|
})); |
|
} |
|
}); |
|
}; |
|
|
|
PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) { |
|
var self = this; |
|
this.topicExists(name, function(err, exists) { |
|
if (err) { |
|
callback(err); |
|
} else{ |
|
if (exists) { |
|
callback(null); |
|
} else { |
|
self.pub.createTopic({name: name}, callback); |
|
} |
|
} |
|
}); |
|
}; |
|
|
|
PubsubRunner.prototype.removeTopic = function(callback) { |
|
var name = this.getTestTopicName(); |
|
console.log('... removing Topic', name); |
|
this.pub.deleteTopic({topic: name}, function(err, value) { |
|
if (err) { |
|
console.log('Could not delete a topic: rpc failed with', err); |
|
callback(err); |
|
} else { |
|
console.log('removed Topic', name, 'OK'); |
|
callback(null); |
|
} |
|
}); |
|
}; |
|
|
|
PubsubRunner.prototype.createTopic = function(callback) { |
|
var name = this.getTestTopicName(); |
|
console.log('... creating Topic', name); |
|
this.pub.createTopic({name: name}, function(err, value) { |
|
if (err) { |
|
console.log('Could not create a topic: rpc failed with', err); |
|
callback(err); |
|
} else { |
|
console.log('created Topic', name, 'OK'); |
|
callback(null); |
|
} |
|
}); |
|
}; |
|
|
|
PubsubRunner.prototype.listSomeTopics = function(callback) { |
|
console.log('Listing topics'); |
|
console.log('-------------_'); |
|
this.listProjectTopics(function(err, response) { |
|
if (err) { |
|
console.log('Could not list topic: rpc failed with', err); |
|
callback(err); |
|
} else { |
|
_.each(response.topic, function(t) { |
|
console.log(t.name); |
|
}); |
|
callback(null); |
|
} |
|
}); |
|
}; |
|
|
|
PubsubRunner.prototype.checkExists = function(callback) { |
|
var name = this.getTestTopicName(); |
|
console.log('... checking for topic', name); |
|
this.topicExists(name, function(err, exists) { |
|
if (err) { |
|
console.log('Could not check for a topics: rpc failed with', err); |
|
callback(err); |
|
} else { |
|
if (exists) { |
|
console.log(name, 'is a topic'); |
|
} else { |
|
console.log(name, 'is not a topic'); |
|
} |
|
callback(null); |
|
} |
|
}); |
|
}; |
|
|
|
PubsubRunner.prototype.randomPubSub = function(callback) { |
|
var self = this; |
|
var topic_name = this.getTestTopicName(); |
|
var sub_name = this.getTestSubName(); |
|
var subscription = {name: sub_name, topic: topic_name}; |
|
async.waterfall([ |
|
_.bind(this.createTopicIfNeeded, this, topic_name), |
|
_.bind(this.sub.createSubscription, this.sub, subscription), |
|
function(resp, cb) { |
|
var msg_count = _.random(10, 30); |
|
// Set up msg_count messages to publish |
|
var message_senders = _.times(msg_count, function(n) { |
|
return _.bind(self.pub.publish, self.pub, { |
|
topic: topic_name, |
|
message: {data: new Buffer('message ' + n)} |
|
}); |
|
}); |
|
async.parallel(message_senders, function(err, result) { |
|
cb(err, result, msg_count); |
|
}); |
|
}, |
|
function(result, msg_count, cb) { |
|
console.log('Sent', msg_count, 'messages to', topic_name + ',', |
|
'checking for them now.'); |
|
var batch_request = { |
|
subscription: sub_name, |
|
max_events: msg_count |
|
}; |
|
self.sub.pullBatch(batch_request, cb); |
|
}, |
|
function(batch, cb) { |
|
var ack_id = _.pluck(batch.pull_responses, 'ack_id'); |
|
console.log('Got', ack_id.length, 'messages, acknowledging them...'); |
|
var ack_request = { |
|
subscription: sub_name, |
|
ack_id: ack_id |
|
}; |
|
self.sub.acknowledge(ack_request, cb); |
|
}, |
|
function(result, cb) { |
|
console.log( |
|
'Test messages were acknowledged OK, deleting the subscription'); |
|
self.sub.deleteSubscription({subscription: sub_name}, cb); |
|
} |
|
], function (err, result) { |
|
if (err) { |
|
console.log('Could not do random pub sub: rpc failed with', err); |
|
} |
|
callback(err, result); |
|
}); |
|
}; |
|
|
|
function main(callback) { |
|
var argv = parseArgs(process.argv, { |
|
string: [ |
|
'host', |
|
'oauth_scope', |
|
'port', |
|
'action', |
|
'project_id', |
|
'topic_name', |
|
'sub_name' |
|
], |
|
default: { |
|
host: 'pubsub-staging.googleapis.com', |
|
oauth_scope: 'https://www.googleapis.com/auth/pubsub', |
|
port: 443, |
|
action: 'listSomeTopics', |
|
project_id: 'stoked-keyword-656' |
|
} |
|
}); |
|
var valid_actions = [ |
|
'createTopic', |
|
'removeTopic', |
|
'listSomeTopics', |
|
'checkExists', |
|
'randomPubSub' |
|
]; |
|
if (_.some(valid_actions, function(action) { |
|
return action === argv.action; |
|
})) { |
|
callback(new Error('Action was not valid')); |
|
} |
|
var address = argv.host + ':' + argv.port; |
|
(new GoogleAuth()).getApplicationDefault(function(err, credential) { |
|
if (err) { |
|
callback(err); |
|
return; |
|
} |
|
if (credential.createScopedRequired()) { |
|
credential = credential.createScoped(argv.oauth_scope); |
|
} |
|
var updateMetadata = grpc.getGoogleAuthDelegate(credential); |
|
var ca_path = process.env.SSL_CERT_FILE; |
|
fs.readFile(ca_path, function(err, ca_data) { |
|
if (err) { |
|
callback(err); |
|
return; |
|
} |
|
var ssl_creds = grpc.Credentials.createSsl(ca_data); |
|
var options = { |
|
credentials: ssl_creds, |
|
'grpc.ssl_target_name_override': argv.host |
|
}; |
|
var pub = new pubsub.PublisherService(address, options, updateMetadata); |
|
var sub = new pubsub.SubscriberService(address, options, updateMetadata); |
|
var runner = new PubsubRunner(pub, sub, argv); |
|
runner[argv.action](callback); |
|
}); |
|
}); |
|
} |
|
|
|
if (require.main === module) { |
|
main(function(err) { |
|
if (err) throw err; |
|
}); |
|
} |
|
|
|
module.exports = PubsubRunner;
|
|
|