Add support for unary client, closed-loop or poisson

pull/6010/head
vjpai 9 years ago
parent d7f43b3aab
commit ad1c1cc6cf
  1. 81
      src/ruby/qps/client.rb
  2. 8
      src/ruby/qps/qps-common.rb
  3. 11
      src/ruby/qps/server.rb
  4. 3
      src/ruby/qps/worker.rb

@ -55,3 +55,84 @@ class Poisson
end
end
class BenchmarkClient
def initialize(config)
if config.security_params
if config.security_params.use_test_ca
certs = load_test_certs
cred = GRPC::Core::Credentials.new(certs[0])
else
p 'Unsupported to use non-test CA (TBD)'
exit
end
if config.security_params.server_host_override
p 'Unsupported to use severt host override (TBD)'
exit
end
else
cred = :this_channel_is_insecure
end
@histres = config.histogram_params.resolution
@histmax = config.histogram_params.max_possible
@start_time = Time.now
@histogram = Histogram.new(@histres, @histmax)
@done = false
(0..config.client_channels-1).each do |i|
Thread.new {
stub = ''
req = Grpc::Testing::SimpleRequest.new(response_type: Grpc::Testing::PayloadType::COMPRESSABLE,
response_size: config.payload_config.simple_params.resp_size,
payload: Grpc::Testing::Payload.new(type: Grpc::Testing::PayloadType::COMPRESSABLE,
body: nulls(config.payload_config.simple_params.req_size)))
case config.load_params.load.to_s
when 'closed_loop'
waiter = nil
when 'poisson'
waiter = Poisson.new(config.load_params.poisson.offered_load / config.client_channels)
end
stub = Grpc::Testing::BenchmarkService::Stub.new(config.server_targets[i % config.server_targets.length], cred)
case config.rpc_type
when :UNARY
unary_ping_ponger(req,stub,config,waiter)
when :STREAMING
streaming_ping_ponger(req,stub,config,waiter)
end
}
end
end
def wait_to_issue(waiter)
if waiter
delay = waiter.advance-Time.now
sleep delay if delay > 0
end
end
def unary_ping_ponger(req, stub, config,waiter)
while !@done
wait_to_issue(waiter)
start = Time.now
resp = stub.unary_call(req)
@histogram.add((Time.now-start)*1e9)
end
end
def streaming_ping_ponger(req, stub, config, waiter)
end
def mark(reset)
lat = Grpc::Testing::HistogramData.new(
bucket: @histogram.contents,
min_seen: @histogram.minimum,
max_seen: @histogram.maximum,
sum: @histogram.sum,
sum_of_squares: @histogram.sum_of_squares,
count: @histogram.count
)
elapsed = Time.now-@start_time
if reset
@start_time = Time.now
@histogram = Histogram.new(@histres, @histmax)
end
Grpc::Testing::ClientStats.new(latencies: lat, time_elapsed: elapsed)
end
def shutdown
@done = true
end
end

@ -44,6 +44,14 @@ def nulls(l)
[].pack('x' * l).force_encoding('ascii-8bit')
end
# load the test-only certificates
def load_test_certs
this_dir = File.expand_path(File.dirname(__FILE__))
data_dir = File.join(File.dirname(this_dir), 'spec/testdata')
files = ['ca.pem', 'server1.key', 'server1.pem']
files.map { |f| File.open(File.join(data_dir, f)).read }
end
# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item.
class EnumeratorQueue
extend Forwardable

@ -62,13 +62,6 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service
end
end
def load_test_certs
this_dir = File.expand_path(File.dirname(__FILE__))
data_dir = File.join(File.dirname(this_dir), 'spec/testdata')
files = ['ca.pem', 'server1.key', 'server1.pem']
files.map { |f| File.open(File.join(data_dir, f)).read }
end
class BenchmarkServer
def initialize(config, port)
if config.security_params
@ -87,9 +80,7 @@ class BenchmarkServer
end
def mark(reset)
s = Grpc::Testing::ServerStats.new(time_elapsed: (Time.now-@start_time).to_f)
if reset
@start_time = Time.now
end
@start_time = Time.now if reset
s
end
def get_port

@ -73,7 +73,7 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
Thread.new {
client = ''
reqs.each do |req|
case req.argtype
case req.argtype.to_s
when 'setup'
client = BenchmarkClient.new(req.setup)
q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(false)))
@ -82,6 +82,7 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
end
end
q.push(self)
client.shutdown
}
q.each_item
end

Loading…
Cancel
Save