Got Ruby stress client working, with some modifications to interop tests

pull/6266/head
murgatroid99 9 years ago
parent e621f13ecd
commit ddaa69f15d
  1. 28
      src/ruby/pb/grpc/testing/metrics.rb
  2. 27
      src/ruby/pb/grpc/testing/metrics_services.rb
  3. 28
      src/ruby/pb/test/client.rb
  4. 2
      src/ruby/pb/test/server.rb
  5. 83
      src/ruby/stress/metrics_server.rb
  6. 155
      src/ruby/stress/stress_client.rb

@ -0,0 +1,28 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: grpc/testing/metrics.proto
require 'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "grpc.testing.GaugeResponse" do
optional :name, :string, 1
oneof :value do
optional :long_value, :int64, 2
optional :double_value, :double, 3
optional :string_value, :string, 4
end
end
add_message "grpc.testing.GaugeRequest" do
optional :name, :string, 1
end
add_message "grpc.testing.EmptyMessage" do
end
end
module Grpc
module Testing
GaugeResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.GaugeResponse").msgclass
GaugeRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.GaugeRequest").msgclass
EmptyMessage = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.EmptyMessage").msgclass
end
end

@ -0,0 +1,27 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# Source: grpc/testing/metrics.proto for package 'grpc.testing'
require 'grpc'
require 'grpc/testing/metrics'
module Grpc
module Testing
module MetricsService
# TODO: add proto service documentation here
class Service
include GRPC::GenericService
self.marshal_class_method = :encode
self.unmarshal_class_method = :decode
self.service_name = 'grpc.testing.MetricsService'
rpc :GetAllGauges, EmptyMessage, stream(GaugeResponse)
rpc :GetGauge, GaugeRequest, GaugeResponse
end
Stub = Service.rpc_stub_class
end
end
end

@ -38,23 +38,23 @@
# --server_port=<port> \
# --test_case=<testcase_name>
# These lines are required for the generated files to load grpc
this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
pb_dir = File.dirname(File.dirname(this_dir))
pb_dir = File.dirname(this_dir)
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir)
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
require 'optparse'
require 'logger'
require 'grpc'
require_relative '../../lib/grpc'
require 'googleauth'
require 'google/protobuf'
require 'test/proto/empty'
require 'test/proto/messages'
require 'test/proto/test_services'
require_relative 'proto/empty'
require_relative 'proto/messages'
require_relative 'proto/test_services'
AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR
@ -208,12 +208,10 @@ class NamedTests
def empty_unary
resp = @stub.empty_call(Empty.new)
assert('empty_unary: invalid response') { resp.is_a?(Empty) }
p 'OK: empty_unary'
end
def large_unary
perform_large_unary
p 'OK: large_unary'
end
def service_account_creds
@ -230,7 +228,6 @@ class NamedTests
assert("#{__callee__}: bad oauth scope") do
@args.oauth_scope.include?(resp.oauth_scope)
end
p "OK: #{__callee__}"
end
def jwt_token_creds
@ -238,7 +235,6 @@ class NamedTests
wanted_email = MultiJson.load(json_key)['client_email']
resp = perform_large_unary(fill_username: true)
assert("#{__callee__}: bad username") { wanted_email == resp.username }
p "OK: #{__callee__}"
end
def compute_engine_creds
@ -247,7 +243,6 @@ class NamedTests
assert("#{__callee__}: bad username") do
@args.default_service_account == resp.username
end
p "OK: #{__callee__}"
end
def oauth2_auth_token
@ -259,7 +254,6 @@ class NamedTests
assert("#{__callee__}: bad oauth scope") do
@args.oauth_scope.include?(resp.oauth_scope)
end
p "OK: #{__callee__}"
end
def per_rpc_creds
@ -279,7 +273,6 @@ class NamedTests
assert("#{__callee__}: bad oauth scope") do
@args.oauth_scope.include?(resp.oauth_scope)
end
p "OK: #{__callee__}"
end
def client_streaming
@ -293,7 +286,6 @@ class NamedTests
assert("#{__callee__}: aggregate payload size is incorrect") do
wanted_aggregate_size == resp.aggregated_payload_size
end
p "OK: #{__callee__}"
end
def server_streaming
@ -311,7 +303,6 @@ class NamedTests
:COMPRESSABLE == r.payload.type
end
end
p "OK: #{__callee__}"
end
def ping_pong
@ -319,7 +310,6 @@ class NamedTests
ppp = PingPongPlayer.new(msg_sizes)
resps = @stub.full_duplex_call(ppp.each_item)
resps.each { |r| ppp.queue.push(r) }
p "OK: #{__callee__}"
end
def timeout_on_sleeping_server
@ -332,7 +322,6 @@ class NamedTests
assert("#{__callee__}: status was wrong") do
e.code == GRPC::Core::StatusCodes::DEADLINE_EXCEEDED
end
p "OK: #{__callee__}"
end
def empty_stream
@ -346,7 +335,6 @@ class NamedTests
assert("#{__callee__}: too many responses expected 0") do
count == 0
end
p "OK: #{__callee__}"
end
def cancel_after_begin
@ -361,7 +349,6 @@ class NamedTests
fail 'Should have raised GRPC:Cancelled'
rescue GRPC::Cancelled
assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled }
p "OK: #{__callee__}"
end
def cancel_after_first_response
@ -374,7 +361,6 @@ class NamedTests
rescue GRPC::Cancelled
assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled }
op.wait
p "OK: #{__callee__}"
end
def all
@ -442,7 +428,7 @@ def parse_args
opts.on('--use_tls USE_TLS', ['false', 'true'],
'require a secure connection?') do |v|
args['secure'] = v == 'true'
end
p end
opts.on('--use_test_ca USE_TEST_CA', ['false', 'true'],
'if secure, use the test certificate?') do |v|
args['use_test_ca'] = v == 'true'

@ -39,7 +39,7 @@
this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
pb_dir = File.dirname(File.dirname(this_dir))
pb_dir = File.dirname(this_dir)
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir)
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)

@ -0,0 +1,83 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require_relative '../pb/grpc/testing/metrics.rb'
require_relative '../pb/grpc/testing/metrics_services.rb'
class Gauge
def get_name
raise NoMethodError.new
end
def get_type
raise NoMethodError.new
end
def get_value
raise NoMethodError.new
end
end
class MetricsServiceImpl < Grpc::Testing::MetricsService::Service
include Grpc::Testing
@gauges
def initialize
@gauges = {}
end
def register_gauge(gauge)
@gauges[gauge.get_name] = gauge
end
def make_gauge_response(gauge)
response = GaugeResponse.new(:name => gauge.get_name)
value = gauge.get_value
case gauge.get_type
when 'long'
response.long_value = value
when 'double'
response.double_value = value
when 'string'
response.string_value = value
end
response
end
def get_all_gauges(_empty, _call)
@gauges.values.map do |gauge|
make_gauge_response gauge
end
end
def get_gauge(gauge_req, _call)
gauge = @gauges[gauge_req.name]
make_gauge_response gauge
end
end

@ -0,0 +1,155 @@
#!/usr/bin/env ruby
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'optparse'
require 'thread'
require_relative '../pb/test/client'
require_relative './metrics_server'
require_relative '../lib/grpc'
class QpsGauge < Gauge
@query_count
@query_mutex
@start_time
def initialize
@query_count = 0
@query_mutex = Mutex.new
@start_time = Time.now
end
def increment_queries
@query_mutex.synchronize { @query_count += 1}
end
def get_name
'qps'
end
def get_type
'long'
end
def get_value
(@query_mutex.synchronize { @query_count / (Time.now - @start_time) }).to_i
end
end
def start_metrics_server(port)
host = "0.0.0.0:#{port}"
server = GRPC::RpcServer.new
server.add_http2_port(host, :this_port_is_insecure)
service = MetricsServiceImpl.new
server.handle(service)
server_thread = Thread.new { server.run_till_terminated }
[server, service, server_thread]
end
StressArgs = Struct.new(:server_addresses, :test_cases, :duration,
:channels_per_server, :concurrent_calls, :metrics_port)
def start(stress_args)
running = true
threads = []
qps_gauge = QpsGauge.new
metrics_server, metrics_service, metrics_thread =
start_metrics_server(stress_args.metrics_port)
metrics_service.register_gauge(qps_gauge)
stress_args.server_addresses.each do |address|
stress_args.channels_per_server.times do
client_args = Args.new
client_args.host, client_args.port = address.split(':')
client_args.secure = false
client_args.test_case = ''
stub = create_stub(client_args)
named_tests = NamedTests.new(stub, client_args)
stress_args.concurrent_calls.times do
threads << Thread.new do
while running
named_tests.method(stress_args.test_cases.sample).call
qps_gauge.increment_queries
end
end
end
end
end
if stress_args.duration >= 0
sleep stress_args.duration
running = false
metrics_server.stop
p "QPS: #{qps_gauge.get_value}"
threads.each { |thd| thd.join; }
end
metrics_thread.join
end
def parse_stress_args
stress_args = StressArgs.new
stress_args.server_addresses = ['localhost:8080']
stress_args.test_cases = []
stress_args.duration = -1
stress_args.channels_per_server = 1
stress_args.concurrent_calls = 1
stress_args.metrics_port = '8081'
OptionParser.new do |opts|
opts.on('--server_addresses [LIST]', Array) do |addrs|
stress_args.server_addresses = addrs
end
opts.on('--test_cases cases', Array) do |cases|
stress_args.test_cases = (cases.map do |item|
split = item.split(':')
[split[0]] * split[1].to_i
end).reduce([], :+)
end
opts.on('--test_duration_secs [INT]', OptionParser::DecimalInteger) do |time|
stress_args.duration = time
end
opts.on('--num_channels_per_server [INT]', OptionParser::DecimalInteger) do |channels|
stress_args.channels_per_server = channels
end
opts.on('--num_stubs_per_channel [INT]', OptionParser::DecimalInteger) do |stubs|
stress_args.concurrent_calls = stubs
end
opts.on('--metrics_port [port]') do |port|
stress_args.metrics_port = port
end
end.parse!
stress_args
end
def main
opts = parse_stress_args
start(opts)
end
if __FILE__ == $0
main
end
Loading…
Cancel
Save