Merge pull request #6010 from vjpai/i_know_rubies

Ruby performance testing worker
pull/6042/head
Jan Tattermusch 9 years ago
commit eddcd85a72
  1. 9
      src/ruby/pb/generate_proto_ruby.sh
  2. 164
      src/ruby/qps/client.rb
  3. 88
      src/ruby/qps/histogram.rb
  4. 76
      src/ruby/qps/qps-common.rb
  5. 91
      src/ruby/qps/server.rb
  6. 129
      src/ruby/qps/src/proto/grpc/testing/control.rb
  7. 80
      src/ruby/qps/src/proto/grpc/testing/messages.rb
  8. 33
      src/ruby/qps/src/proto/grpc/testing/payloads.rb
  9. 14
      src/ruby/qps/src/proto/grpc/testing/services.rb
  10. 46
      src/ruby/qps/src/proto/grpc/testing/services_services.rb
  11. 39
      src/ruby/qps/src/proto/grpc/testing/stats.rb
  12. 128
      src/ruby/qps/worker.rb

@ -40,11 +40,18 @@ $PROTOC -I src/proto src/proto/grpc/health/v1/health.proto \
--ruby_out=src/ruby/pb \
--plugin=$PLUGIN
$PROTOC -I . test/proto/{messages,test,empty}.proto \
$PROTOC -I . \
src/proto/grpc/testing/{messages,test,empty}.proto \
--grpc_out=src/ruby/pb \
--ruby_out=src/ruby/pb \
--plugin=$PLUGIN
$PROTOC -I . \
src/proto/grpc/testing/{messages,payloads,stats,services,control}.proto \
--grpc_out=src/ruby/qps \
--ruby_out=src/ruby/qps \
--plugin=$PLUGIN
$PROTOC -I src/proto/math src/proto/math/math.proto \
--grpc_out=src/ruby/bin \
--ruby_out=src/ruby/bin \

@ -0,0 +1,164 @@
#!/usr/bin/env ruby
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Worker and worker service implementation
this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(File.dirname(this_dir), 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
require 'grpc'
require 'histogram'
require 'src/proto/grpc/testing/services_services'
class Poisson
def interarrival
@lambda_recip * (-Math.log(1.0-rand))
end
def advance
t = @next_time
@next_time += interarrival
t
end
def initialize(lambda)
@lambda_recip = 1.0/lambda
@next_time = Time.now + interarrival
end
end
class BenchmarkClient
def initialize(config)
opts = {}
if config.security_params
if config.security_params.use_test_ca
certs = load_test_certs
cred = GRPC::Core::ChannelCredentials.new(certs[0])
else
cred = GRPC::Core::ChannelCredentials.new()
end
if config.security_params.server_host_override
opts[GRPC::Core::Channel::SSL_TARGET] =
config.security_params.server_host_override
end
else
cred = :this_channel_is_insecure
end
@histres = config.histogram_params.resolution
@histmax = config.histogram_params.max_possible
@start_time = Time.now
@histogram = Histogram.new(@histres, @histmax)
@done = false
gtsr = Grpc::Testing::SimpleRequest
gtpt = Grpc::Testing::PayloadType
gtp = Grpc::Testing::Payload
simple_params = config.payload_config.simple_params
req = gtsr.new(response_type: gtpt::COMPRESSABLE,
response_size: simple_params.resp_size,
payload: gtp.new(type: gtpt::COMPRESSABLE,
body: nulls(simple_params.req_size)))
(0..config.client_channels-1).each do |chan|
gtbss = Grpc::Testing::BenchmarkService::Stub
st = config.server_targets
stub = gtbss.new(st[chan % st.length], cred, **opts)
(0..config.outstanding_rpcs_per_channel-1).each do |r|
Thread.new {
case config.load_params.load.to_s
when 'closed_loop'
waiter = nil
when 'poisson'
waiter = Poisson.new(config.load_params.poisson.offered_load /
(config.client_channels *
config.outstanding_rpcs_per_channel))
end
case config.rpc_type
when :UNARY
unary_ping_ponger(req,stub,config,waiter)
when :STREAMING
streaming_ping_ponger(req,stub,config,waiter)
end
}
end
end
end
def wait_to_issue(waiter)
if waiter
delay = waiter.advance-Time.now
sleep delay if delay > 0
end
end
def unary_ping_ponger(req, stub, config,waiter)
while !@done
wait_to_issue(waiter)
start = Time.now
resp = stub.unary_call(req)
@histogram.add((Time.now-start)*1e9)
end
end
def streaming_ping_ponger(req, stub, config, waiter)
q = EnumeratorQueue.new(self)
resp = stub.streaming_call(q.each_item)
start = Time.now
q.push(req)
resp.each do |r|
@histogram.add((Time.now-start)*1e9)
if !@done
wait_to_issue(waiter)
start = Time.now
q.push(req)
else
q.push(self)
break
end
end
end
def mark(reset)
lat = Grpc::Testing::HistogramData.new(
bucket: @histogram.contents,
min_seen: @histogram.minimum,
max_seen: @histogram.maximum,
sum: @histogram.sum,
sum_of_squares: @histogram.sum_of_squares,
count: @histogram.count
)
elapsed = Time.now-@start_time
if reset
@start_time = Time.now
@histogram = Histogram.new(@histres, @histmax)
end
Grpc::Testing::ClientStats.new(latencies: lat, time_elapsed: elapsed)
end
def shutdown
@done = true
end
end

@ -0,0 +1,88 @@
#!/usr/bin/env ruby
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Histogram class for use in performance testing and measurement
class Histogram
# Determine the bucket index for a given value
# @param {number} value The value to check
# @return {number} The bucket index
def bucket_for(value)
(Math.log(value)/Math.log(@multiplier)).to_i
end
# Initialize an empty histogram
# @param {number} resolution The resolution of the histogram
# @param {number} max_possible The maximum value for the histogram
def initialize(resolution, max_possible)
@resolution=resolution
@max_possible=max_possible
@sum=0
@sum_of_squares=0
@multiplier=1+resolution
@count=0
@min_seen=max_possible
@max_seen=0
@buckets=Array.new(bucket_for(max_possible)+1, 0)
end
# Add a value to the histogram. This updates all statistics with the new
# value. Those statistics should not be modified except with this function
# @param {number} value The value to add
def add(value)
@sum += value
@sum_of_squares += value * value
@count += 1
if value < @min_seen
@min_seen = value
end
if value > @max_seen
@max_seen = value
end
@buckets[bucket_for(value)] += 1
end
def minimum
@min_seen
end
def maximum
@max_seen
end
def sum
@sum
end
def sum_of_squares
@sum_of_squares
end
def count
@count
end
def contents
@buckets
end
end

@ -0,0 +1,76 @@
#!/usr/bin/env ruby
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Worker and worker service implementation
this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(File.dirname(this_dir), 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
require 'grpc'
# produces a string of null chars (\0 aka pack 'x') of length l.
def nulls(l)
fail 'requires #{l} to be +ve' if l < 0
[].pack('x' * l).force_encoding('ascii-8bit')
end
# load the test-only certificates
def load_test_certs
this_dir = File.expand_path(File.dirname(__FILE__))
data_dir = File.join(File.dirname(this_dir), 'spec/testdata')
files = ['ca.pem', 'server1.key', 'server1.pem']
files.map { |f| File.open(File.join(data_dir, f)).read }
end
# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item.
class EnumeratorQueue
extend Forwardable
def_delegators :@q, :push
def initialize(sentinel)
@q = Queue.new
@sentinel = sentinel
end
def each_item
return enum_for(:each_item) unless block_given?
loop do
r = @q.pop
break if r.equal?(@sentinel)
fail r if r.is_a? Exception
yield r
end
end
end

@ -0,0 +1,91 @@
#!/usr/bin/env ruby
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Worker and worker service implementation
this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(File.dirname(this_dir), 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
require 'grpc'
require 'qps-common'
require 'src/proto/grpc/testing/messages'
require 'src/proto/grpc/testing/services_services'
require 'src/proto/grpc/testing/stats'
class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service
def unary_call(req, _call)
sr = Grpc::Testing::SimpleResponse
pl = Grpc::Testing::Payload
sr.new(payload: pl.new(body: nulls(req.response_size)))
end
def streaming_call(reqs)
q = EnumeratorQueue.new(self)
Thread.new {
sr = Grpc::Testing::SimpleResponse
pl = Grpc::Testing::Payload
reqs.each do |req|
q.push(sr.new(payload: pl.new(body: nulls(req.response_size))))
end
q.push(self)
}
q.each_item
end
end
class BenchmarkServer
def initialize(config, port)
if config.security_params
certs = load_test_certs
cred = GRPC::Core::ServerCredentials.new(
nil, [{private_key: certs[1], cert_chain: certs[2]}], false)
else
cred = :this_port_is_insecure
end
@server = GRPC::RpcServer.new
@port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred)
@server.handle(BenchmarkServiceImpl.new)
@start_time = Time.now
Thread.new {
@server.run
}
end
def mark(reset)
s = Grpc::Testing::ServerStats.new(time_elapsed:
(Time.now-@start_time).to_f)
@start_time = Time.now if reset
s
end
def get_port
@port
end
end

@ -0,0 +1,129 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: src/proto/grpc/testing/control.proto
require 'google/protobuf'
require 'src/proto/grpc/testing/payloads'
require 'src/proto/grpc/testing/stats'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "grpc.testing.PoissonParams" do
optional :offered_load, :double, 1
end
add_message "grpc.testing.UniformParams" do
optional :interarrival_lo, :double, 1
optional :interarrival_hi, :double, 2
end
add_message "grpc.testing.DeterministicParams" do
optional :offered_load, :double, 1
end
add_message "grpc.testing.ParetoParams" do
optional :interarrival_base, :double, 1
optional :alpha, :double, 2
end
add_message "grpc.testing.ClosedLoopParams" do
end
add_message "grpc.testing.LoadParams" do
oneof :load do
optional :closed_loop, :message, 1, "grpc.testing.ClosedLoopParams"
optional :poisson, :message, 2, "grpc.testing.PoissonParams"
optional :uniform, :message, 3, "grpc.testing.UniformParams"
optional :determ, :message, 4, "grpc.testing.DeterministicParams"
optional :pareto, :message, 5, "grpc.testing.ParetoParams"
end
end
add_message "grpc.testing.SecurityParams" do
optional :use_test_ca, :bool, 1
optional :server_host_override, :string, 2
end
add_message "grpc.testing.ClientConfig" do
repeated :server_targets, :string, 1
optional :client_type, :enum, 2, "grpc.testing.ClientType"
optional :security_params, :message, 3, "grpc.testing.SecurityParams"
optional :outstanding_rpcs_per_channel, :int32, 4
optional :client_channels, :int32, 5
optional :async_client_threads, :int32, 7
optional :rpc_type, :enum, 8, "grpc.testing.RpcType"
optional :load_params, :message, 10, "grpc.testing.LoadParams"
optional :payload_config, :message, 11, "grpc.testing.PayloadConfig"
optional :histogram_params, :message, 12, "grpc.testing.HistogramParams"
repeated :core_list, :int32, 13
optional :core_limit, :int32, 14
end
add_message "grpc.testing.ClientStatus" do
optional :stats, :message, 1, "grpc.testing.ClientStats"
end
add_message "grpc.testing.Mark" do
optional :reset, :bool, 1
end
add_message "grpc.testing.ClientArgs" do
oneof :argtype do
optional :setup, :message, 1, "grpc.testing.ClientConfig"
optional :mark, :message, 2, "grpc.testing.Mark"
end
end
add_message "grpc.testing.ServerConfig" do
optional :server_type, :enum, 1, "grpc.testing.ServerType"
optional :security_params, :message, 2, "grpc.testing.SecurityParams"
optional :port, :int32, 4
optional :async_server_threads, :int32, 7
optional :core_limit, :int32, 8
optional :payload_config, :message, 9, "grpc.testing.PayloadConfig"
repeated :core_list, :int32, 10
end
add_message "grpc.testing.ServerArgs" do
oneof :argtype do
optional :setup, :message, 1, "grpc.testing.ServerConfig"
optional :mark, :message, 2, "grpc.testing.Mark"
end
end
add_message "grpc.testing.ServerStatus" do
optional :stats, :message, 1, "grpc.testing.ServerStats"
optional :port, :int32, 2
optional :cores, :int32, 3
end
add_message "grpc.testing.CoreRequest" do
end
add_message "grpc.testing.CoreResponse" do
optional :cores, :int32, 1
end
add_message "grpc.testing.Void" do
end
add_enum "grpc.testing.ClientType" do
value :SYNC_CLIENT, 0
value :ASYNC_CLIENT, 1
end
add_enum "grpc.testing.ServerType" do
value :SYNC_SERVER, 0
value :ASYNC_SERVER, 1
value :ASYNC_GENERIC_SERVER, 2
end
add_enum "grpc.testing.RpcType" do
value :UNARY, 0
value :STREAMING, 1
end
end
module Grpc
module Testing
PoissonParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.PoissonParams").msgclass
UniformParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.UniformParams").msgclass
DeterministicParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.DeterministicParams").msgclass
ParetoParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ParetoParams").msgclass
ClosedLoopParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ClosedLoopParams").msgclass
LoadParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.LoadParams").msgclass
SecurityParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SecurityParams").msgclass
ClientConfig = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ClientConfig").msgclass
ClientStatus = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ClientStatus").msgclass
Mark = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Mark").msgclass
ClientArgs = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ClientArgs").msgclass
ServerConfig = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ServerConfig").msgclass
ServerArgs = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ServerArgs").msgclass
ServerStatus = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ServerStatus").msgclass
CoreRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.CoreRequest").msgclass
CoreResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.CoreResponse").msgclass
Void = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Void").msgclass
ClientType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ClientType").enummodule
ServerType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ServerType").enummodule
RpcType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.RpcType").enummodule
end
end

@ -0,0 +1,80 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: src/proto/grpc/testing/messages.proto
require 'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "grpc.testing.Payload" do
optional :type, :enum, 1, "grpc.testing.PayloadType"
optional :body, :bytes, 2
end
add_message "grpc.testing.EchoStatus" do
optional :code, :int32, 1
optional :message, :string, 2
end
add_message "grpc.testing.SimpleRequest" do
optional :response_type, :enum, 1, "grpc.testing.PayloadType"
optional :response_size, :int32, 2
optional :payload, :message, 3, "grpc.testing.Payload"
optional :fill_username, :bool, 4
optional :fill_oauth_scope, :bool, 5
optional :response_compression, :enum, 6, "grpc.testing.CompressionType"
optional :response_status, :message, 7, "grpc.testing.EchoStatus"
end
add_message "grpc.testing.SimpleResponse" do
optional :payload, :message, 1, "grpc.testing.Payload"
optional :username, :string, 2
optional :oauth_scope, :string, 3
end
add_message "grpc.testing.StreamingInputCallRequest" do
optional :payload, :message, 1, "grpc.testing.Payload"
end
add_message "grpc.testing.StreamingInputCallResponse" do
optional :aggregated_payload_size, :int32, 1
end
add_message "grpc.testing.ResponseParameters" do
optional :size, :int32, 1
optional :interval_us, :int32, 2
end
add_message "grpc.testing.StreamingOutputCallRequest" do
optional :response_type, :enum, 1, "grpc.testing.PayloadType"
repeated :response_parameters, :message, 2, "grpc.testing.ResponseParameters"
optional :payload, :message, 3, "grpc.testing.Payload"
optional :response_compression, :enum, 6, "grpc.testing.CompressionType"
optional :response_status, :message, 7, "grpc.testing.EchoStatus"
end
add_message "grpc.testing.StreamingOutputCallResponse" do
optional :payload, :message, 1, "grpc.testing.Payload"
end
add_message "grpc.testing.ReconnectInfo" do
optional :passed, :bool, 1
repeated :backoff_ms, :int32, 2
end
add_enum "grpc.testing.PayloadType" do
value :COMPRESSABLE, 0
value :UNCOMPRESSABLE, 1
value :RANDOM, 2
end
add_enum "grpc.testing.CompressionType" do
value :NONE, 0
value :GZIP, 1
value :DEFLATE, 2
end
end
module Grpc
module Testing
Payload = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Payload").msgclass
EchoStatus = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.EchoStatus").msgclass
SimpleRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleRequest").msgclass
SimpleResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleResponse").msgclass
StreamingInputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingInputCallRequest").msgclass
StreamingInputCallResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingInputCallResponse").msgclass
ResponseParameters = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ResponseParameters").msgclass
StreamingOutputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallRequest").msgclass
StreamingOutputCallResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallResponse").msgclass
ReconnectInfo = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ReconnectInfo").msgclass
PayloadType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.PayloadType").enummodule
CompressionType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.CompressionType").enummodule
end
end

@ -0,0 +1,33 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: src/proto/grpc/testing/payloads.proto
require 'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "grpc.testing.ByteBufferParams" do
optional :req_size, :int32, 1
optional :resp_size, :int32, 2
end
add_message "grpc.testing.SimpleProtoParams" do
optional :req_size, :int32, 1
optional :resp_size, :int32, 2
end
add_message "grpc.testing.ComplexProtoParams" do
end
add_message "grpc.testing.PayloadConfig" do
oneof :payload do
optional :bytebuf_params, :message, 1, "grpc.testing.ByteBufferParams"
optional :simple_params, :message, 2, "grpc.testing.SimpleProtoParams"
optional :complex_params, :message, 3, "grpc.testing.ComplexProtoParams"
end
end
end
module Grpc
module Testing
ByteBufferParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ByteBufferParams").msgclass
SimpleProtoParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleProtoParams").msgclass
ComplexProtoParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ComplexProtoParams").msgclass
PayloadConfig = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.PayloadConfig").msgclass
end
end

@ -0,0 +1,14 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: src/proto/grpc/testing/services.proto
require 'google/protobuf'
require 'src/proto/grpc/testing/messages'
require 'src/proto/grpc/testing/control'
Google::Protobuf::DescriptorPool.generated_pool.build do
end
module Grpc
module Testing
end
end

@ -0,0 +1,46 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# Source: src/proto/grpc/testing/services.proto for package 'grpc.testing'
require 'grpc'
require 'src/proto/grpc/testing/services'
module Grpc
module Testing
module BenchmarkService
# TODO: add proto service documentation here
class Service
include GRPC::GenericService
self.marshal_class_method = :encode
self.unmarshal_class_method = :decode
self.service_name = 'grpc.testing.BenchmarkService'
rpc :UnaryCall, SimpleRequest, SimpleResponse
rpc :StreamingCall, stream(SimpleRequest), stream(SimpleResponse)
end
Stub = Service.rpc_stub_class
end
module WorkerService
# TODO: add proto service documentation here
class Service
include GRPC::GenericService
self.marshal_class_method = :encode
self.unmarshal_class_method = :decode
self.service_name = 'grpc.testing.WorkerService'
rpc :RunServer, stream(ServerArgs), stream(ServerStatus)
rpc :RunClient, stream(ClientArgs), stream(ClientStatus)
rpc :CoreCount, CoreRequest, CoreResponse
rpc :QuitWorker, Void, Void
end
Stub = Service.rpc_stub_class
end
end
end

@ -0,0 +1,39 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: src/proto/grpc/testing/stats.proto
require 'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "grpc.testing.ServerStats" do
optional :time_elapsed, :double, 1
optional :time_user, :double, 2
optional :time_system, :double, 3
end
add_message "grpc.testing.HistogramParams" do
optional :resolution, :double, 1
optional :max_possible, :double, 2
end
add_message "grpc.testing.HistogramData" do
repeated :bucket, :uint32, 1
optional :min_seen, :double, 2
optional :max_seen, :double, 3
optional :sum, :double, 4
optional :sum_of_squares, :double, 5
optional :count, :double, 6
end
add_message "grpc.testing.ClientStats" do
optional :latencies, :message, 1, "grpc.testing.HistogramData"
optional :time_elapsed, :double, 2
optional :time_user, :double, 3
optional :time_system, :double, 4
end
end
module Grpc
module Testing
ServerStats = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ServerStats").msgclass
HistogramParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.HistogramParams").msgclass
HistogramData = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.HistogramData").msgclass
ClientStats = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ClientStats").msgclass
end
end

@ -0,0 +1,128 @@
#!/usr/bin/env ruby
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Worker and worker service implementation
this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(File.dirname(this_dir), 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
require 'grpc'
require 'optparse'
require 'histogram'
require 'etc'
require 'facter'
require 'client'
require 'qps-common'
require 'server'
require 'src/proto/grpc/testing/services_services'
class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
def cpu_cores
Facter.value('processors')['count']
end
def run_server(reqs)
q = EnumeratorQueue.new(self)
Thread.new {
bms = ''
gtss = Grpc::Testing::ServerStatus
reqs.each do |req|
case req.argtype.to_s
when 'setup'
bms = BenchmarkServer.new(req.setup, @server_port)
q.push(gtss.new(stats: bms.mark(false), port: bms.get_port))
when 'mark'
q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores))
end
end
q.push(self)
bms.stop
}
q.each_item
end
def run_client(reqs)
q = EnumeratorQueue.new(self)
Thread.new {
client = ''
reqs.each do |req|
case req.argtype.to_s
when 'setup'
client = BenchmarkClient.new(req.setup)
q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(false)))
when 'mark'
q.push(Grpc::Testing::ClientStatus.new(stats:
client.mark(req.mark.reset)))
end
end
q.push(self)
client.shutdown
}
q.each_item
end
def core_count(_args, _call)
Grpc::Testing::CoreResponse.new(cores: cpu_cores)
end
def quit_worker(_args, _call)
Thread.new {
sleep 3
@server.stop
}
Grpc::Testing::Void.new
end
def initialize(s, sp)
@server = s
@server_port = sp
end
end
def main
options = {
'driver_port' => 0,
'server_port' => 0
}
OptionParser.new do |opts|
opts.banner = 'Usage: [--driver_port <port>] [--server_port <port>]'
opts.on('--driver_port PORT', '<port>') do |v|
options['driver_port'] = v
end
opts.on('--server_port PORT', '<port>') do |v|
options['server_port'] = v
end
end.parse!
s = GRPC::RpcServer.new
s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s,
:this_port_is_insecure)
s.handle(WorkerServiceImpl.new(s, options['server_port'].to_i))
s.run
end
main
Loading…
Cancel
Save