Merge branch 'master' into hpack_table

pull/6029/head
Yang Gao 9 years ago
commit 39afce836d
  1. 36
      src/core/ext/resolver/sockaddr/sockaddr_resolver.c
  2. 123
      src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
  3. 1
      src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
  4. 148
      src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs
  5. 6
      src/ruby/ext/grpc/rb_call.c
  6. 35
      src/ruby/spec/generic/client_stub_spec.rb
  7. 28
      tools/run_tests/performance/__init__.py
  8. 15
      tools/run_tests/performance/build_performance.sh
  9. 153
      tools/run_tests/performance/scenario_config.py
  10. 133
      tools/run_tests/run_performance_tests.py

@ -31,13 +31,13 @@
*
*/
#include <grpc/support/port_platform.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/client_config/lb_policy_registry.h"
@ -263,22 +263,24 @@ static grpc_resolver *sockaddr_create(
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
r->lb_policy_name = NULL;
if (0 != strcmp(args->uri->query, "")) {
gpr_slice query_slice;
gpr_slice_buffer query_parts;
query_slice =
gpr_slice_new(args->uri->query, strlen(args->uri->query), do_nothing);
gpr_slice_buffer_init(&query_parts);
gpr_slice_split(query_slice, "=", &query_parts);
GPR_ASSERT(query_parts.count == 2);
if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) {
r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII);
}
gpr_slice_buffer_destroy(&query_parts);
gpr_slice_unref(query_slice);
r->lb_policy_name =
gpr_strdup(grpc_uri_get_query_arg(args->uri, "lb_policy"));
const char *lb_enabled_qpart =
grpc_uri_get_query_arg(args->uri, "lb_enabled");
/* anything other than "0" is interpreted as true */
const bool lb_enabled =
(lb_enabled_qpart != NULL && (strcmp("0", lb_enabled_qpart) != 0));
if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 &&
!lb_enabled) {
/* we want grpclb but the "resolved" addresses aren't LB enabled. Bail
* out, as this is meant mostly for tests. */
gpr_log(GPR_ERROR,
"Requested 'grpclb' LB policy but resolved addresses don't "
"support load balancing.");
abort();
}
if (r->lb_policy_name == NULL) {
r->lb_policy_name = gpr_strdup(default_lb_policy_name);
}

@ -61,15 +61,7 @@ namespace Grpc.IntegrationTesting
public static IClientRunner CreateStarted(ClientConfig config)
{
Logger.Debug("ClientConfig: {0}", config);
string target = config.ServerTargets.Single();
GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop,
"Only closed loop scenario supported for C#");
GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1");
if (config.OutstandingRpcsPerChannel != 0)
{
Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value");
}
if (config.AsyncClientThreads != 0)
{
Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value");
@ -83,22 +75,40 @@ namespace Grpc.IntegrationTesting
Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value");
}
var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
var channels = CreateChannels(config.ClientChannels, config.ServerTargets, config.SecurityParams);
return new ClientRunnerImpl(channels,
config.ClientType,
config.RpcType,
config.OutstandingRpcsPerChannel,
config.LoadParams,
config.PayloadConfig,
config.HistogramParams);
}
private static List<Channel> CreateChannels(int clientChannels, IEnumerable<string> serverTargets, SecurityParams securityParams)
{
GrpcPreconditions.CheckArgument(clientChannels > 0, "clientChannels needs to be at least 1.");
GrpcPreconditions.CheckArgument(serverTargets.Count() > 0, "at least one serverTarget needs to be specified.");
var credentials = securityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
List<ChannelOption> channelOptions = null;
if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "")
if (securityParams != null && securityParams.ServerHostOverride != "")
{
channelOptions = new List<ChannelOption>
{
new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride)
new ChannelOption(ChannelOptions.SslTargetNameOverride, securityParams.ServerHostOverride)
};
}
var channel = new Channel(target, credentials, channelOptions);
return new ClientRunnerImpl(channel,
config.ClientType,
config.RpcType,
config.PayloadConfig,
config.HistogramParams);
var result = new List<Channel>();
for (int i = 0; i < clientChannels; i++)
{
var target = serverTargets.ElementAt(i % serverTargets.Count());
var channel = new Channel(target, credentials, channelOptions);
result.Add(channel);
}
return result;
}
}
@ -106,30 +116,35 @@ namespace Grpc.IntegrationTesting
{
const double SecondsToNanos = 1e9;
readonly Channel channel;
readonly List<Channel> channels;
readonly ClientType clientType;
readonly RpcType rpcType;
readonly PayloadConfig payloadConfig;
readonly Histogram histogram;
readonly BenchmarkService.BenchmarkServiceClient client;
readonly Task runnerTask;
readonly CancellationTokenSource stoppedCts;
readonly List<Task> runnerTasks;
readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams)
public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams)
{
this.channel = GrpcPreconditions.CheckNotNull(channel);
GrpcPreconditions.CheckArgument(outstandingRpcsPerChannel > 0, "outstandingRpcsPerChannel");
this.channels = new List<Channel>(channels);
this.clientType = clientType;
this.rpcType = rpcType;
this.payloadConfig = payloadConfig;
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
this.stoppedCts = new CancellationTokenSource();
this.client = BenchmarkService.NewClient(channel);
var threadBody = GetThreadBody();
this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning);
this.runnerTasks = new List<Task>();
foreach (var channel in this.channels)
{
for (int i = 0; i < outstandingRpcsPerChannel; i++)
{
var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel);
var threadBody = GetThreadBody(channel, timer);
this.runnerTasks.Add(Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning));
}
}
}
public ClientStats GetStats(bool reset)
@ -150,12 +165,19 @@ namespace Grpc.IntegrationTesting
public async Task StopAsync()
{
stoppedCts.Cancel();
await runnerTask;
await channel.ShutdownAsync();
foreach (var runnerTask in runnerTasks)
{
await runnerTask;
}
foreach (var channel in channels)
{
await channel.ShutdownAsync();
}
}
private void RunClosedLoopUnary()
private void RunUnary(Channel channel, IInterarrivalTimer timer)
{
var client = BenchmarkService.NewClient(channel);
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
@ -167,11 +189,14 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
timer.WaitForNext();
}
}
private async Task RunClosedLoopUnaryAsync()
private async Task RunUnaryAsync(Channel channel, IInterarrivalTimer timer)
{
var client = BenchmarkService.NewClient(channel);
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
@ -183,11 +208,14 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
}
private async Task RunClosedLoopStreamingAsync()
private async Task RunStreamingPingPongAsync(Channel channel, IInterarrivalTimer timer)
{
var client = BenchmarkService.NewClient(channel);
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
@ -202,6 +230,8 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
// finish the streaming call
@ -210,7 +240,7 @@ namespace Grpc.IntegrationTesting
}
}
private async Task RunGenericClosedLoopStreamingAsync()
private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer)
{
var request = CreateByteBufferRequest();
var stopwatch = new Stopwatch();
@ -228,6 +258,8 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
// finish the streaming call
@ -236,7 +268,7 @@ namespace Grpc.IntegrationTesting
}
}
private Action GetThreadBody()
private Action GetThreadBody(Channel channel, IInterarrivalTimer timer)
{
if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
{
@ -244,7 +276,7 @@ namespace Grpc.IntegrationTesting
GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls");
return () =>
{
RunGenericClosedLoopStreamingAsync().Wait();
RunGenericStreamingAsync(channel, timer).Wait();
};
}
@ -252,7 +284,7 @@ namespace Grpc.IntegrationTesting
if (clientType == ClientType.SYNC_CLIENT)
{
GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#");
return RunClosedLoopUnary;
return () => RunUnary(channel, timer);
}
else if (clientType == ClientType.ASYNC_CLIENT)
{
@ -261,12 +293,12 @@ namespace Grpc.IntegrationTesting
case RpcType.UNARY:
return () =>
{
RunClosedLoopUnaryAsync().Wait();
RunUnaryAsync(channel, timer).Wait();
};
case RpcType.STREAMING:
return () =>
{
RunClosedLoopStreamingAsync().Wait();
RunStreamingPingPongAsync(channel, timer).Wait();
};
}
}
@ -292,5 +324,18 @@ namespace Grpc.IntegrationTesting
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
}
private static IInterarrivalTimer CreateTimer(LoadParams loadParams, double loadMultiplier)
{
switch (loadParams.LoadCase)
{
case LoadParams.LoadOneofCase.ClosedLoop:
return new ClosedLoopInterarrivalTimer();
case LoadParams.LoadOneofCase.Poisson:
return new PoissonInterarrivalTimer(loadParams.Poisson.OfferedLoad * loadMultiplier);
default:
throw new ArgumentException("Unknown load type");
}
}
}
}

@ -115,6 +115,7 @@
<Compile Include="GenericService.cs" />
<Compile Include="GeneratedServiceBaseTest.cs" />
<Compile Include="GeneratedClientTest.cs" />
<Compile Include="InterarrivalTimers.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -0,0 +1,148 @@
#region Copyright notice and license
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Utils;
using Grpc.Testing;
namespace Grpc.IntegrationTesting
{
public interface IInterarrivalTimer
{
void WaitForNext();
Task WaitForNextAsync();
}
/// <summary>
/// Interarrival timer that doesn't wait at all.
/// </summary>
public class ClosedLoopInterarrivalTimer : IInterarrivalTimer
{
public ClosedLoopInterarrivalTimer()
{
}
public void WaitForNext()
{
// NOP
}
public Task WaitForNextAsync()
{
return Task.FromResult<object>(null);
}
}
/// <summary>
/// Interarrival timer that generates Poisson process load.
/// </summary>
public class PoissonInterarrivalTimer : IInterarrivalTimer
{
readonly ExponentialDistribution exponentialDistribution;
DateTime? lastEventTime;
public PoissonInterarrivalTimer(double offeredLoad)
{
this.exponentialDistribution = new ExponentialDistribution(new Random(), offeredLoad);
this.lastEventTime = DateTime.UtcNow;
}
public void WaitForNext()
{
var waitDuration = GetNextWaitDuration();
int millisTimeout = (int) Math.Round(waitDuration.TotalMilliseconds);
if (millisTimeout > 0)
{
// TODO(jtattermusch): probably only works well for a relatively low interarrival rate
Thread.Sleep(millisTimeout);
}
}
public async Task WaitForNextAsync()
{
var waitDuration = GetNextWaitDuration();
int millisTimeout = (int) Math.Round(waitDuration.TotalMilliseconds);
if (millisTimeout > 0)
{
// TODO(jtattermusch): probably only works well for a relatively low interarrival rate
await Task.Delay(millisTimeout);
}
}
private TimeSpan GetNextWaitDuration()
{
if (!lastEventTime.HasValue)
{
this.lastEventTime = DateTime.Now;
}
var origLastEventTime = this.lastEventTime.Value;
this.lastEventTime = origLastEventTime + TimeSpan.FromSeconds(exponentialDistribution.Next());
return this.lastEventTime.Value - origLastEventTime;
}
/// <summary>
/// Exp generator.
/// </summary>
private class ExponentialDistribution
{
readonly Random random;
readonly double lambda;
readonly double lambdaReciprocal;
public ExponentialDistribution(Random random, double lambda)
{
this.random = random;
this.lambda = lambda;
this.lambdaReciprocal = 1.0 / lambda;
}
public double Next()
{
double uniform = random.NextDouble();
// Use 1.0-uni above to avoid NaN if uni is 0
return lambdaReciprocal * (-Math.Log(1.0 - uniform));
}
}
}
}

@ -359,7 +359,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
md_ary->metadata[md_ary->count].value_length = value_len;
md_ary->count += 1;
}
} else {
} else if (TYPE(val) == T_STRING) {
value_str = RSTRING_PTR(val);
value_len = RSTRING_LEN(val);
if (!grpc_is_binary_header(key_str, key_len) &&
@ -373,6 +373,10 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
md_ary->metadata[md_ary->count].value = value_str;
md_ary->metadata[md_ary->count].value_length = value_len;
md_ary->count += 1;
} else {
rb_raise(rb_eArgError,
"Header values must be of type string or array");
return ST_STOP;
}
return ST_CONTINUE;

@ -193,44 +193,45 @@ describe 'ClientStub' do
describe '#client_streamer' do
shared_examples 'client streaming' do
before(:each) do
server_port = create_test_server
host = "localhost:#{server_port}"
@stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
@options = { k1: 'v1', k2: 'v2' }
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@resp = 'a_reply'
end
it 'should send requests to/receive a reply from a server' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_client_streamer(@sent_msgs, @resp, @pass)
stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp)
expect(get_response(@stub)).to eq(@resp)
th.join
end
it 'should send metadata to the server ok' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_client_streamer(@sent_msgs, @resp, @pass,
k1: 'v1', k2: 'v2')
stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp)
th = run_client_streamer(@sent_msgs, @resp, @pass, @options)
expect(get_response(@stub)).to eq(@resp)
th.join
end
it 'should raise an error if the status is not ok' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_client_streamer(@sent_msgs, @resp, @fail)
stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
blk = proc { get_response(stub) }
blk = proc { get_response(@stub) }
expect(&blk).to raise_error(GRPC::BadStatus)
th.join
end
it 'should raise ArgumentError if metadata contains invalid values' do
@options.merge!(k3: 3)
expect do
get_response(@stub)
end.to raise_error(ArgumentError,
/Header values must be of type string or array/)
end
end
describe 'without a call operation' do
def get_response(stub)
stub.client_streamer(@method, @sent_msgs, noop, noop,
k1: 'v1', k2: 'v2')
stub.client_streamer(@method, @sent_msgs, noop, noop, @options)
end
it_behaves_like 'client streaming'
@ -239,7 +240,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_response(stub)
op = stub.client_streamer(@method, @sent_msgs, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
@options.merge(return_op: true))
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute
end

@ -0,0 +1,28 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -36,16 +36,17 @@ cd $(dirname $0)/../../..
CONFIG=${CONFIG:-opt}
# build C++ qps worker & driver always - we need at least the driver to
# run any of the scenarios.
# TODO(jtattermusch): not embedding OpenSSL breaks the C# build because
# grpc_csharp_ext needs OpenSSL embedded and some intermediate files from
# this build will be reused.
make CONFIG=${CONFIG} EMBED_OPENSSL=true EMBED_ZLIB=true qps_worker qps_driver -j8
for language in $@
do
if [ "$language" == "c++" ]
if [ "$language" != "c++" ]
then
# build C++ qps worker & driver
# TODO(jtattermusch): not embedding OpenSSL breaks the C# build because
# grpc_csharp_ext needs OpenSSL embedded and some intermediate files from
# this build will be reused.
make CONFIG=${CONFIG} EMBED_OPENSSL=true EMBED_ZLIB=true qps_worker qps_driver -j8
else
tools/run_tests/run_tests.py -l $language -c $CONFIG --build_only -j 8
fi
done

@ -0,0 +1,153 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# performance scenario configuration for various languages
class CXXLanguage:
def __init__(self):
self.safename = 'cxx'
def worker_cmdline(self):
return ['bins/opt/qps_worker']
def worker_port_offset(self):
return 0
def scenarios(self):
# TODO(jtattermusch): add more scenarios
return {
# Scenario 1: generic async streaming ping-pong (contentionless latency)
'cpp_async_generic_streaming_ping_pong': [
'--rpc_type=STREAMING',
'--client_type=ASYNC_CLIENT',
'--server_type=ASYNC_GENERIC_SERVER',
'--outstanding_rpcs_per_channel=1',
'--client_channels=1',
'--bbuf_req_size=0',
'--bbuf_resp_size=0',
'--async_client_threads=1',
'--async_server_threads=1',
'--secure_test=true',
'--num_servers=1',
'--num_clients=1',
'--server_core_limit=0',
'--client_core_limit=0'],
# Scenario 5: Sync unary ping-pong with protobufs
'cpp_sync_unary_ping_pong_protobuf': [
'--rpc_type=UNARY',
'--client_type=SYNC_CLIENT',
'--server_type=SYNC_SERVER',
'--outstanding_rpcs_per_channel=1',
'--client_channels=1',
'--simple_req_size=0',
'--simple_resp_size=0',
'--secure_test=true',
'--num_servers=1',
'--num_clients=1',
'--server_core_limit=0',
'--client_core_limit=0']}
def __str__(self):
return 'c++'
class CSharpLanguage:
def __init__(self):
self.safename = str(self)
def worker_cmdline(self):
return ['tools/run_tests/performance/run_worker_csharp.sh']
def worker_port_offset(self):
return 100
def scenarios(self):
# TODO(jtattermusch): add more scenarios
return {
# Scenario 1: generic async streaming ping-pong (contentionless latency)
'csharp_async_generic_streaming_ping_pong': [
'--rpc_type=STREAMING',
'--client_type=ASYNC_CLIENT',
'--server_type=ASYNC_GENERIC_SERVER',
'--outstanding_rpcs_per_channel=1',
'--client_channels=1',
'--bbuf_req_size=0',
'--bbuf_resp_size=0',
'--async_client_threads=1',
'--async_server_threads=1',
'--secure_test=true',
'--num_servers=1',
'--num_clients=1',
'--server_core_limit=0',
'--client_core_limit=0']}
def __str__(self):
return 'csharp'
class NodeLanguage:
def __init__(self):
pass
self.safename = str(self)
def worker_cmdline(self):
return ['tools/run_tests/performance/run_worker_node.sh']
def worker_port_offset(self):
return 200
def scenarios(self):
# TODO(jtattermusch): add more scenarios
return {
'node_sync_unary_ping_pong_protobuf': [
'--rpc_type=UNARY',
'--client_type=ASYNC_CLIENT',
'--server_type=ASYNC_SERVER',
'--outstanding_rpcs_per_channel=1',
'--client_channels=1',
'--simple_req_size=0',
'--simple_resp_size=0',
'--secure_test=false',
'--num_servers=1',
'--num_clients=1',
'--server_core_limit=0',
'--client_core_limit=0']}
def __str__(self):
return 'node'
LANGUAGES = {
'c++' : CXXLanguage(),
'csharp' : CSharpLanguage(),
'node' : NodeLanguage(),
}

@ -40,6 +40,7 @@ import sys
import tempfile
import time
import uuid
import performance.scenario_config as scenario_config
_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
@ -49,130 +50,6 @@ os.chdir(_ROOT)
_REMOTE_HOST_USERNAME = 'jenkins'
class CXXLanguage:
def __init__(self):
self.safename = 'cxx'
def worker_cmdline(self):
return ['bins/opt/qps_worker']
def worker_port_offset(self):
return 0
def scenarios(self):
# TODO(jtattermusch): add more scenarios
return {
# Scenario 1: generic async streaming ping-pong (contentionless latency)
'cpp_async_generic_streaming_ping_pong': [
'--rpc_type=STREAMING',
'--client_type=ASYNC_CLIENT',
'--server_type=ASYNC_GENERIC_SERVER',
'--outstanding_rpcs_per_channel=1',
'--client_channels=1',
'--bbuf_req_size=0',
'--bbuf_resp_size=0',
'--async_client_threads=1',
'--async_server_threads=1',
'--secure_test=true',
'--num_servers=1',
'--num_clients=1',
'--server_core_limit=0',
'--client_core_limit=0'],
# Scenario 5: Sync unary ping-pong with protobufs
'cpp_sync_unary_ping_pong_protobuf': [
'--rpc_type=UNARY',
'--client_type=SYNC_CLIENT',
'--server_type=SYNC_SERVER',
'--outstanding_rpcs_per_channel=1',
'--client_channels=1',
'--simple_req_size=0',
'--simple_resp_size=0',
'--secure_test=true',
'--num_servers=1',
'--num_clients=1',
'--server_core_limit=0',
'--client_core_limit=0']}
def __str__(self):
return 'c++'
class CSharpLanguage:
def __init__(self):
self.safename = str(self)
def worker_cmdline(self):
return ['tools/run_tests/performance/run_worker_csharp.sh']
def worker_port_offset(self):
return 100
def scenarios(self):
# TODO(jtattermusch): add more scenarios
return {
# Scenario 1: generic async streaming ping-pong (contentionless latency)
'csharp_async_generic_streaming_ping_pong': [
'--rpc_type=STREAMING',
'--client_type=ASYNC_CLIENT',
'--server_type=ASYNC_GENERIC_SERVER',
'--outstanding_rpcs_per_channel=1',
'--client_channels=1',
'--bbuf_req_size=0',
'--bbuf_resp_size=0',
'--async_client_threads=1',
'--async_server_threads=1',
'--secure_test=true',
'--num_servers=1',
'--num_clients=1',
'--server_core_limit=0',
'--client_core_limit=0']}
def __str__(self):
return 'csharp'
class NodeLanguage:
def __init__(self):
pass
self.safename = str(self)
def worker_cmdline(self):
return ['tools/run_tests/performance/run_worker_node.sh']
def worker_port_offset(self):
return 200
def scenarios(self):
# TODO(jtattermusch): add more scenarios
return {
'node_sync_unary_ping_pong_protobuf': [
'--rpc_type=UNARY',
'--client_type=ASYNC_CLIENT',
'--server_type=ASYNC_SERVER',
'--outstanding_rpcs_per_channel=1',
'--client_channels=1',
'--simple_req_size=0',
'--simple_resp_size=0',
'--secure_test=false',
'--num_servers=1',
'--num_clients=1',
'--server_core_limit=0',
'--client_core_limit=0']}
def __str__(self):
return 'node'
_LANGUAGES = {
'c++' : CXXLanguage(),
'csharp' : CSharpLanguage(),
'node' : NodeLanguage(),
}
class QpsWorkerJob:
"""Encapsulates a qps worker server job."""
@ -272,7 +149,7 @@ def prepare_remote_hosts(hosts):
sys.exit(1)
def build_on_remote_hosts(hosts, languages=_LANGUAGES.keys(), build_local=False):
def build_on_remote_hosts(hosts, languages=scenario_config.LANGUAGES.keys(), build_local=False):
"""Builds performance worker on remote hosts (and maybe also locally)."""
build_timeout = 15*60
build_jobs = []
@ -366,7 +243,7 @@ def finish_qps_workers(jobs):
argp = argparse.ArgumentParser(description='Run performance tests.')
argp.add_argument('-l', '--language',
choices=['all'] + sorted(_LANGUAGES.keys()),
choices=['all'] + sorted(scenario_config.LANGUAGES.keys()),
nargs='+',
default=['all'],
help='Languages to benchmark.')
@ -380,9 +257,9 @@ argp.add_argument('--remote_worker_host',
args = argp.parse_args()
languages = set(_LANGUAGES[l]
languages = set(scenario_config.LANGUAGES[l]
for l in itertools.chain.from_iterable(
_LANGUAGES.iterkeys() if x == 'all' else [x]
scenario_config.LANGUAGES.iterkeys() if x == 'all' else [x]
for x in args.language))
# Put together set of remote hosts where to run and build

Loading…
Cancel
Save