mirror of https://github.com/grpc/grpc.git
The C based gRPC (C++, Python, Ruby, Objective-C, PHP, C#)
https://grpc.io/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
241 lines
7.8 KiB
241 lines
7.8 KiB
#!/usr/bin/env ruby |
|
|
|
# Copyright 2015 gRPC authors. |
|
# |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
# you may not use this file except in compliance with the License. |
|
# You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
# See the License for the specific language governing permissions and |
|
# limitations under the License. |
|
|
|
# pubsub_demo demos accesses the Google PubSub API via its gRPC interface |
|
# |
|
# $ GOOGLE_APPLICATION_CREDENTIALS=<path_to_service_account_key_file> \ |
|
# path/to/pubsub_demo.rb \ |
|
# [--action=<chosen_demo_action> ] |
|
# |
|
# There are options related to the chosen action, see #parse_args below. |
|
# - the possible actions are given by the method names of NamedAction class |
|
# - the default action is list_some_topics |
|
|
|
this_dir = File.expand_path(File.dirname(__FILE__)) |
|
lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib') |
|
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) |
|
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) |
|
|
|
require 'optparse' |
|
|
|
require 'grpc' |
|
require 'googleauth' |
|
require 'google/protobuf' |
|
|
|
require 'google/protobuf/empty' |
|
require 'tech/pubsub/proto/pubsub' |
|
require 'tech/pubsub/proto/pubsub_services' |
|
|
|
# creates a SSL Credentials from the production certificates. |
|
def ssl_creds |
|
GRPC::Core::ChannelCredentials.new() |
|
end |
|
|
|
# Builds the metadata authentication update proc. |
|
def auth_proc(opts) |
|
auth_creds = Google::Auth.get_application_default |
|
return auth_creds.updater_proc |
|
end |
|
|
|
# Creates a stub for accessing the publisher service. |
|
def publisher_stub(opts) |
|
address = "#{opts.host}:#{opts.port}" |
|
stub_clz = Tech::Pubsub::PublisherService::Stub # shorter |
|
GRPC.logger.info("... access PublisherService at #{address}") |
|
call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts)) |
|
combined_creds = ssl_creds.compose(call_creds) |
|
stub_clz.new(address, creds: combined_creds, |
|
GRPC::Core::Channel::SSL_TARGET => opts.host) |
|
end |
|
|
|
# Creates a stub for accessing the subscriber service. |
|
def subscriber_stub(opts) |
|
address = "#{opts.host}:#{opts.port}" |
|
stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter |
|
GRPC.logger.info("... access SubscriberService at #{address}") |
|
call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts)) |
|
combined_creds = ssl_creds.compose(call_creds) |
|
stub_clz.new(address, creds: combined_creds, |
|
GRPC::Core::Channel::SSL_TARGET => opts.host) |
|
end |
|
|
|
# defines methods corresponding to each interop test case. |
|
class NamedActions |
|
include Tech::Pubsub |
|
|
|
# Initializes NamedActions |
|
# |
|
# @param pub [Stub] a stub for accessing the publisher service |
|
# @param sub [Stub] a stub for accessing the publisher service |
|
# @param args [Args] provides access to the command line |
|
def initialize(pub, sub, args) |
|
@pub = pub |
|
@sub = sub |
|
@args = args |
|
end |
|
|
|
# Removes the test topic if it exists |
|
def remove_topic |
|
name = test_topic_name |
|
p "... removing Topic #{name}" |
|
@pub.delete_topic(DeleteTopicRequest.new(topic: name)) |
|
p "removed Topic: #{name} OK" |
|
rescue GRPC::BadStatus => e |
|
p "Could not delete a topics: rpc failed with '#{e}'" |
|
end |
|
|
|
# Creates a test topic |
|
def create_topic |
|
name = test_topic_name |
|
p "... creating Topic #{name}" |
|
resp = @pub.create_topic(Topic.new(name: name)) |
|
p "created Topic: #{resp.name} OK" |
|
rescue GRPC::BadStatus => e |
|
p "Could not create a topics: rpc failed with '#{e}'" |
|
end |
|
|
|
# Lists topics in the project |
|
def list_some_topics |
|
p 'Listing topics' |
|
p '-------------_' |
|
list_project_topics.topic.each { |t| p t.name } |
|
rescue GRPC::BadStatus => e |
|
p "Could not list topics: rpc failed with '#{e}'" |
|
end |
|
|
|
# Checks if a topics exists in a project |
|
def check_exists |
|
name = test_topic_name |
|
p "... checking for topic #{name}" |
|
exists = topic_exists?(name) |
|
p "#{name} is a topic" if exists |
|
p "#{name} is not a topic" unless exists |
|
rescue GRPC::BadStatus => e |
|
p "Could not check for a topics: rpc failed with '#{e}'" |
|
end |
|
|
|
# Publishes some messages |
|
def random_pub_sub |
|
topic_name, sub_name = test_topic_name, test_sub_name |
|
create_topic_if_needed(topic_name) |
|
@sub.create_subscription(Subscription.new(name: sub_name, |
|
topic: topic_name)) |
|
msg_count = rand(10..30) |
|
msg_count.times do |x| |
|
msg = PubsubMessage.new(data: "message #{x}") |
|
@pub.publish(PublishRequest.new(topic: topic_name, message: msg)) |
|
end |
|
p "Sent #{msg_count} messages to #{topic_name}, checking for them now." |
|
batch = @sub.pull_batch(PullBatchRequest.new(subscription: sub_name, |
|
max_events: msg_count)) |
|
ack_ids = batch.pull_responses.map { |x| x.ack_id } |
|
p "Got #{ack_ids.size} messages; acknowledging them.." |
|
@sub.acknowledge(AcknowledgeRequest.new(subscription: sub_name, |
|
ack_id: ack_ids)) |
|
p "Test messages were acknowledged OK, deleting the subscription" |
|
del_req = DeleteSubscriptionRequest.new(subscription: sub_name) |
|
@sub.delete_subscription(del_req) |
|
rescue GRPC::BadStatus => e |
|
p "Could not do random pub sub: rpc failed with '#{e}'" |
|
end |
|
|
|
private |
|
|
|
# test_topic_name is the topic name to use in this test. |
|
def test_topic_name |
|
unless @args.topic_name.nil? |
|
return "/topics/#{@args.project_id}/#{@args.topic_name}" |
|
end |
|
now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L') |
|
"/topics/#{@args.project_id}/#{ENV['USER']}-#{now_text}" |
|
end |
|
|
|
# test_sub_name is the subscription name to use in this test. |
|
def test_sub_name |
|
unless @args.sub_name.nil? |
|
return "/subscriptions/#{@args.project_id}/#{@args.sub_name}" |
|
end |
|
now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L') |
|
"/subscriptions/#{@args.project_id}/#{ENV['USER']}-#{now_text}" |
|
end |
|
|
|
# determines if the topic name exists |
|
def topic_exists?(name) |
|
topics = list_project_topics.topic.map { |t| t.name } |
|
topics.include?(name) |
|
end |
|
|
|
def create_topic_if_needed(name) |
|
return if topic_exists?(name) |
|
@pub.create_topic(Topic.new(name: name)) |
|
end |
|
|
|
def list_project_topics |
|
q = "cloud.googleapis.com/project in (/projects/#{@args.project_id})" |
|
@pub.list_topics(ListTopicsRequest.new(query: q)) |
|
end |
|
end |
|
|
|
# Args is used to hold the command line info. |
|
Args = Struct.new(:host, :port, :action, :project_id, :topic_name, |
|
:sub_name) |
|
|
|
# validates the command line options, returning them as an Arg. |
|
def parse_args |
|
args = Args.new('pubsub-staging.googleapis.com', |
|
443, 'list_some_topics', 'stoked-keyword-656') |
|
OptionParser.new do |opts| |
|
opts.on('--server_host SERVER_HOST', 'server hostname') do |v| |
|
args.host = v |
|
end |
|
opts.on('--server_port SERVER_PORT', 'server port') do |v| |
|
args.port = v |
|
end |
|
|
|
# instance_methods(false) gives only the methods defined in that class. |
|
scenes = NamedActions.instance_methods(false).map { |t| t.to_s } |
|
scene_list = scenes.join(',') |
|
opts.on("--action CODE", scenes, {}, 'pick a demo action', |
|
" (#{scene_list})") do |v| |
|
args.action = v |
|
end |
|
|
|
# Set the remaining values. |
|
%w(project_id topic_name sub_name).each do |o| |
|
opts.on("--#{o} VALUE", "#{o}") do |v| |
|
args[o] = v |
|
end |
|
end |
|
end.parse! |
|
_check_args(args) |
|
end |
|
|
|
def _check_args(args) |
|
%w(host port action).each do |a| |
|
if args[a].nil? |
|
raise OptionParser::MissingArgument.new("please specify --#{a}") |
|
end |
|
end |
|
args |
|
end |
|
|
|
def main |
|
args = parse_args |
|
pub, sub = publisher_stub(args), subscriber_stub(args) |
|
NamedActions.new(pub, sub, args).method(args.action).call |
|
end |
|
|
|
main
|
|
|