From 64ddf13d1454e57df825d2923046e422abeb780c Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 30 Mar 2016 23:53:50 -0700 Subject: [PATCH 1/9] added grpclb support to sockaddr resolver --- .../resolvers/sockaddr_resolver.c | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/src/core/lib/client_config/resolvers/sockaddr_resolver.c b/src/core/lib/client_config/resolvers/sockaddr_resolver.c index c787bd57d68..b3aaf46ee41 100644 --- a/src/core/lib/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/lib/client_config/resolvers/sockaddr_resolver.c @@ -35,6 +35,7 @@ #include "src/core/lib/client_config/resolvers/sockaddr_resolver.h" +#include #include #include @@ -265,21 +266,27 @@ static grpc_resolver *sockaddr_create( memset(r, 0, sizeof(*r)); r->lb_policy_name = NULL; - if (0 != strcmp(args->uri->query, "")) { - gpr_slice query_slice; - gpr_slice_buffer query_parts; - - query_slice = - gpr_slice_new(args->uri->query, strlen(args->uri->query), do_nothing); - gpr_slice_buffer_init(&query_parts); - gpr_slice_split(query_slice, "=", &query_parts); - GPR_ASSERT(query_parts.count == 2); - if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) { - r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII); + bool lb_enabled = false; + for (size_t i = 0; i < args->uri->num_query_parts; ++i) { + if (0 == strcmp("lb_policy", args->uri->query_parts[i])) { + GPR_ASSERT(args->uri->query_parts_values[i] != NULL); + r->lb_policy_name = gpr_strdup(args->uri->query_parts_values[i]); + } else if (0 == strcmp("lb_enabled", args->uri->query_parts[i])) { + GPR_ASSERT(args->uri->query_parts_values[i] != NULL); + lb_enabled = (strcmp("0", args->uri->query_parts_values[i]) != 0); } - gpr_slice_buffer_destroy(&query_parts); - gpr_slice_unref(query_slice); } + + if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 && + !lb_enabled) { + /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail + * out, as this is meant mostly for tests. */ + gpr_log(GPR_ERROR, + "Requested 'grpclb' LB policy but resolved addresses don't " + "support load balancing."); + abort(); + } + if (r->lb_policy_name == NULL) { r->lb_policy_name = gpr_strdup(default_lb_policy_name); } From f9a7e2876d3d8dd76292afde2de3086f3cf9e0a9 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 31 Mar 2016 09:35:57 -0700 Subject: [PATCH 2/9] Simplified code. --- .../resolvers/sockaddr_resolver.c | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/core/lib/client_config/resolvers/sockaddr_resolver.c b/src/core/lib/client_config/resolvers/sockaddr_resolver.c index b3aaf46ee41..9e286966e4c 100644 --- a/src/core/lib/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/lib/client_config/resolvers/sockaddr_resolver.c @@ -265,17 +265,13 @@ static grpc_resolver *sockaddr_create( r = gpr_malloc(sizeof(sockaddr_resolver)); memset(r, 0, sizeof(*r)); - r->lb_policy_name = NULL; - bool lb_enabled = false; - for (size_t i = 0; i < args->uri->num_query_parts; ++i) { - if (0 == strcmp("lb_policy", args->uri->query_parts[i])) { - GPR_ASSERT(args->uri->query_parts_values[i] != NULL); - r->lb_policy_name = gpr_strdup(args->uri->query_parts_values[i]); - } else if (0 == strcmp("lb_enabled", args->uri->query_parts[i])) { - GPR_ASSERT(args->uri->query_parts_values[i] != NULL); - lb_enabled = (strcmp("0", args->uri->query_parts_values[i]) != 0); - } - } + r->lb_policy_name = + gpr_strdup(grpc_uri_get_query_arg(args->uri, "lb_policy")); + const char *lb_enabled_qpart = + grpc_uri_get_query_arg(args->uri, "lb_enabled"); + /* anything other than "0" is interpreted as true */ + const bool lb_enabled = + (lb_enabled_qpart != NULL && (strcmp("0", lb_enabled_qpart) != 0)); if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 && !lb_enabled) { From ac491d8c1f901fd44c613301d247cd1cd6b82d72 Mon Sep 17 00:00:00 2001 From: Rafael Sales Date: Wed, 2 Mar 2016 02:30:29 -0300 Subject: [PATCH 3/9] Raise on unexpected metadata values The existing implementation was causing segmentation fault because src/ruby/ext/grpc/rb_call.c:358 was trying to convert any value type other than Array to String. The Array type is handled in first `if`. This change will cause the Ruby code that sends non-string values to fail with a better message: `ArgumentError: Header values must be of type string or array` --- src/ruby/ext/grpc/rb_call.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index dc80d18b459..f5fdbb2ffd7 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -359,7 +359,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { md_ary->metadata[md_ary->count].value_length = value_len; md_ary->count += 1; } - } else { + } else if (TYPE(val) == T_STRING) { value_str = RSTRING_PTR(val); value_len = RSTRING_LEN(val); if (!grpc_is_binary_header(key_str, key_len) && @@ -373,6 +373,10 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { md_ary->metadata[md_ary->count].value = value_str; md_ary->metadata[md_ary->count].value_length = value_len; md_ary->count += 1; + } else { + rb_raise(rb_eArgError, + "Header values must be of type string or array"); + return ST_STOP; } return ST_CONTINUE; From bc846f72d6e8f60b6e395822306832f7308d3c10 Mon Sep 17 00:00:00 2001 From: Rafael Sales Date: Wed, 2 Mar 2016 08:04:26 -0300 Subject: [PATCH 4/9] Add spec to make sure invalid metadata values raise ArgumentError --- src/ruby/spec/generic/client_stub_spec.rb | 35 ++++++++++++----------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 5e13c25fcf3..dd8e2e9f7a2 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -193,44 +193,45 @@ describe 'ClientStub' do describe '#client_streamer' do shared_examples 'client streaming' do before(:each) do + server_port = create_test_server + host = "localhost:#{server_port}" + @stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) + @options = { k1: 'v1', k2: 'v2' } @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @resp = 'a_reply' end it 'should send requests to/receive a reply from a server' do - server_port = create_test_server - host = "localhost:#{server_port}" th = run_client_streamer(@sent_msgs, @resp, @pass) - stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) - expect(get_response(stub)).to eq(@resp) + expect(get_response(@stub)).to eq(@resp) th.join end it 'should send metadata to the server ok' do - server_port = create_test_server - host = "localhost:#{server_port}" - th = run_client_streamer(@sent_msgs, @resp, @pass, - k1: 'v1', k2: 'v2') - stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) - expect(get_response(stub)).to eq(@resp) + th = run_client_streamer(@sent_msgs, @resp, @pass, @options) + expect(get_response(@stub)).to eq(@resp) th.join end it 'should raise an error if the status is not ok' do - server_port = create_test_server - host = "localhost:#{server_port}" th = run_client_streamer(@sent_msgs, @resp, @fail) - stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) - blk = proc { get_response(stub) } + blk = proc { get_response(@stub) } expect(&blk).to raise_error(GRPC::BadStatus) th.join end + + it 'should raise ArgumentError if metadata contains invalid values' do + @options.merge!(k3: 3) + expect do + get_response(@stub) + end.to raise_error(ArgumentError, + /Header values must be of type string or array/) + end end describe 'without a call operation' do def get_response(stub) - stub.client_streamer(@method, @sent_msgs, noop, noop, - k1: 'v1', k2: 'v2') + stub.client_streamer(@method, @sent_msgs, noop, noop, @options) end it_behaves_like 'client streaming' @@ -239,7 +240,7 @@ describe 'ClientStub' do describe 'via a call operation' do def get_response(stub) op = stub.client_streamer(@method, @sent_msgs, noop, noop, - return_op: true, k1: 'v1', k2: 'v2') + @options.merge(return_op: true)) expect(op).to be_a(GRPC::ActiveCall::Operation) op.execute end From cb2cd26d048ced54ef041ca44e13f23a17cf889b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 1 Apr 2016 13:59:54 -0700 Subject: [PATCH 5/9] Separate configuration from running logic for performance tests --- tools/run_tests/performance/__init__.py | 0 tools/run_tests/performance/config.py | 153 +++++++++++++++++++++++ tools/run_tests/run_performance_tests.py | 133 +------------------- 3 files changed, 158 insertions(+), 128 deletions(-) create mode 100644 tools/run_tests/performance/__init__.py create mode 100644 tools/run_tests/performance/config.py diff --git a/tools/run_tests/performance/__init__.py b/tools/run_tests/performance/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tools/run_tests/performance/config.py b/tools/run_tests/performance/config.py new file mode 100644 index 00000000000..f95e531fa23 --- /dev/null +++ b/tools/run_tests/performance/config.py @@ -0,0 +1,153 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# performance scenario configuration for various languages + +class CXXLanguage: + + def __init__(self): + self.safename = 'cxx' + + def worker_cmdline(self): + return ['bins/opt/qps_worker'] + + def worker_port_offset(self): + return 0 + + def scenarios(self): + # TODO(jtattermusch): add more scenarios + return { + # Scenario 1: generic async streaming ping-pong (contentionless latency) + 'cpp_async_generic_streaming_ping_pong': [ + '--rpc_type=STREAMING', + '--client_type=ASYNC_CLIENT', + '--server_type=ASYNC_GENERIC_SERVER', + '--outstanding_rpcs_per_channel=1', + '--client_channels=1', + '--bbuf_req_size=0', + '--bbuf_resp_size=0', + '--async_client_threads=1', + '--async_server_threads=1', + '--secure_test=true', + '--num_servers=1', + '--num_clients=1', + '--server_core_limit=0', + '--client_core_limit=0'], + # Scenario 5: Sync unary ping-pong with protobufs + 'cpp_sync_unary_ping_pong_protobuf': [ + '--rpc_type=UNARY', + '--client_type=SYNC_CLIENT', + '--server_type=SYNC_SERVER', + '--outstanding_rpcs_per_channel=1', + '--client_channels=1', + '--simple_req_size=0', + '--simple_resp_size=0', + '--secure_test=true', + '--num_servers=1', + '--num_clients=1', + '--server_core_limit=0', + '--client_core_limit=0']} + + def __str__(self): + return 'c++' + + +class CSharpLanguage: + + def __init__(self): + self.safename = str(self) + + def worker_cmdline(self): + return ['tools/run_tests/performance/run_worker_csharp.sh'] + + def worker_port_offset(self): + return 100 + + def scenarios(self): + # TODO(jtattermusch): add more scenarios + return { + # Scenario 1: generic async streaming ping-pong (contentionless latency) + 'csharp_async_generic_streaming_ping_pong': [ + '--rpc_type=STREAMING', + '--client_type=ASYNC_CLIENT', + '--server_type=ASYNC_GENERIC_SERVER', + '--outstanding_rpcs_per_channel=1', + '--client_channels=1', + '--bbuf_req_size=0', + '--bbuf_resp_size=0', + '--async_client_threads=1', + '--async_server_threads=1', + '--secure_test=true', + '--num_servers=1', + '--num_clients=1', + '--server_core_limit=0', + '--client_core_limit=0']} + + def __str__(self): + return 'csharp' + + +class NodeLanguage: + + def __init__(self): + pass + self.safename = str(self) + + def worker_cmdline(self): + return ['tools/run_tests/performance/run_worker_node.sh'] + + def worker_port_offset(self): + return 200 + + def scenarios(self): + # TODO(jtattermusch): add more scenarios + return { + 'node_sync_unary_ping_pong_protobuf': [ + '--rpc_type=UNARY', + '--client_type=ASYNC_CLIENT', + '--server_type=ASYNC_SERVER', + '--outstanding_rpcs_per_channel=1', + '--client_channels=1', + '--simple_req_size=0', + '--simple_resp_size=0', + '--secure_test=false', + '--num_servers=1', + '--num_clients=1', + '--server_core_limit=0', + '--client_core_limit=0']} + + def __str__(self): + return 'node' + + +LANGUAGES = { + 'c++' : CXXLanguage(), + 'csharp' : CSharpLanguage(), + 'node' : NodeLanguage(), +} diff --git a/tools/run_tests/run_performance_tests.py b/tools/run_tests/run_performance_tests.py index 0ab3d264a59..16b49a94f7e 100755 --- a/tools/run_tests/run_performance_tests.py +++ b/tools/run_tests/run_performance_tests.py @@ -40,6 +40,7 @@ import sys import tempfile import time import uuid +import performance.config as config _ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..')) @@ -49,130 +50,6 @@ os.chdir(_ROOT) _REMOTE_HOST_USERNAME = 'jenkins' -class CXXLanguage: - - def __init__(self): - self.safename = 'cxx' - - def worker_cmdline(self): - return ['bins/opt/qps_worker'] - - def worker_port_offset(self): - return 0 - - def scenarios(self): - # TODO(jtattermusch): add more scenarios - return { - # Scenario 1: generic async streaming ping-pong (contentionless latency) - 'cpp_async_generic_streaming_ping_pong': [ - '--rpc_type=STREAMING', - '--client_type=ASYNC_CLIENT', - '--server_type=ASYNC_GENERIC_SERVER', - '--outstanding_rpcs_per_channel=1', - '--client_channels=1', - '--bbuf_req_size=0', - '--bbuf_resp_size=0', - '--async_client_threads=1', - '--async_server_threads=1', - '--secure_test=true', - '--num_servers=1', - '--num_clients=1', - '--server_core_limit=0', - '--client_core_limit=0'], - # Scenario 5: Sync unary ping-pong with protobufs - 'cpp_sync_unary_ping_pong_protobuf': [ - '--rpc_type=UNARY', - '--client_type=SYNC_CLIENT', - '--server_type=SYNC_SERVER', - '--outstanding_rpcs_per_channel=1', - '--client_channels=1', - '--simple_req_size=0', - '--simple_resp_size=0', - '--secure_test=true', - '--num_servers=1', - '--num_clients=1', - '--server_core_limit=0', - '--client_core_limit=0']} - - def __str__(self): - return 'c++' - - -class CSharpLanguage: - - def __init__(self): - self.safename = str(self) - - def worker_cmdline(self): - return ['tools/run_tests/performance/run_worker_csharp.sh'] - - def worker_port_offset(self): - return 100 - - def scenarios(self): - # TODO(jtattermusch): add more scenarios - return { - # Scenario 1: generic async streaming ping-pong (contentionless latency) - 'csharp_async_generic_streaming_ping_pong': [ - '--rpc_type=STREAMING', - '--client_type=ASYNC_CLIENT', - '--server_type=ASYNC_GENERIC_SERVER', - '--outstanding_rpcs_per_channel=1', - '--client_channels=1', - '--bbuf_req_size=0', - '--bbuf_resp_size=0', - '--async_client_threads=1', - '--async_server_threads=1', - '--secure_test=true', - '--num_servers=1', - '--num_clients=1', - '--server_core_limit=0', - '--client_core_limit=0']} - - def __str__(self): - return 'csharp' - - -class NodeLanguage: - - def __init__(self): - pass - self.safename = str(self) - - def worker_cmdline(self): - return ['tools/run_tests/performance/run_worker_node.sh'] - - def worker_port_offset(self): - return 200 - - def scenarios(self): - # TODO(jtattermusch): add more scenarios - return { - 'node_sync_unary_ping_pong_protobuf': [ - '--rpc_type=UNARY', - '--client_type=ASYNC_CLIENT', - '--server_type=ASYNC_SERVER', - '--outstanding_rpcs_per_channel=1', - '--client_channels=1', - '--simple_req_size=0', - '--simple_resp_size=0', - '--secure_test=false', - '--num_servers=1', - '--num_clients=1', - '--server_core_limit=0', - '--client_core_limit=0']} - - def __str__(self): - return 'node' - - -_LANGUAGES = { - 'c++' : CXXLanguage(), - 'csharp' : CSharpLanguage(), - 'node' : NodeLanguage(), -} - - class QpsWorkerJob: """Encapsulates a qps worker server job.""" @@ -272,7 +149,7 @@ def prepare_remote_hosts(hosts): sys.exit(1) -def build_on_remote_hosts(hosts, languages=_LANGUAGES.keys(), build_local=False): +def build_on_remote_hosts(hosts, languages=config.LANGUAGES.keys(), build_local=False): """Builds performance worker on remote hosts (and maybe also locally).""" build_timeout = 15*60 build_jobs = [] @@ -366,7 +243,7 @@ def finish_qps_workers(jobs): argp = argparse.ArgumentParser(description='Run performance tests.') argp.add_argument('-l', '--language', - choices=['all'] + sorted(_LANGUAGES.keys()), + choices=['all'] + sorted(config.LANGUAGES.keys()), nargs='+', default=['all'], help='Languages to benchmark.') @@ -380,9 +257,9 @@ argp.add_argument('--remote_worker_host', args = argp.parse_args() -languages = set(_LANGUAGES[l] +languages = set(config.LANGUAGES[l] for l in itertools.chain.from_iterable( - _LANGUAGES.iterkeys() if x == 'all' else [x] + config.LANGUAGES.iterkeys() if x == 'all' else [x] for x in args.language)) # Put together set of remote hosts where to run and build From d92d5c5dd22fd300cb55ef26b7373af4b967d0f5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 4 Apr 2016 13:49:29 -0700 Subject: [PATCH 6/9] Add copyright, tweak name --- tools/run_tests/performance/__init__.py | 28 +++++++++++++++++++ .../{config.py => scenario_config.py} | 0 tools/run_tests/run_performance_tests.py | 10 +++---- 3 files changed, 33 insertions(+), 5 deletions(-) rename tools/run_tests/performance/{config.py => scenario_config.py} (100%) diff --git a/tools/run_tests/performance/__init__.py b/tools/run_tests/performance/__init__.py index e69de29bb2d..100a624dc9c 100644 --- a/tools/run_tests/performance/__init__.py +++ b/tools/run_tests/performance/__init__.py @@ -0,0 +1,28 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/tools/run_tests/performance/config.py b/tools/run_tests/performance/scenario_config.py similarity index 100% rename from tools/run_tests/performance/config.py rename to tools/run_tests/performance/scenario_config.py diff --git a/tools/run_tests/run_performance_tests.py b/tools/run_tests/run_performance_tests.py index 16b49a94f7e..e1268e2ecbb 100755 --- a/tools/run_tests/run_performance_tests.py +++ b/tools/run_tests/run_performance_tests.py @@ -40,7 +40,7 @@ import sys import tempfile import time import uuid -import performance.config as config +import performance.scenario_config as scenario_config _ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..')) @@ -149,7 +149,7 @@ def prepare_remote_hosts(hosts): sys.exit(1) -def build_on_remote_hosts(hosts, languages=config.LANGUAGES.keys(), build_local=False): +def build_on_remote_hosts(hosts, languages=scenario_config.LANGUAGES.keys(), build_local=False): """Builds performance worker on remote hosts (and maybe also locally).""" build_timeout = 15*60 build_jobs = [] @@ -243,7 +243,7 @@ def finish_qps_workers(jobs): argp = argparse.ArgumentParser(description='Run performance tests.') argp.add_argument('-l', '--language', - choices=['all'] + sorted(config.LANGUAGES.keys()), + choices=['all'] + sorted(scenario_config.LANGUAGES.keys()), nargs='+', default=['all'], help='Languages to benchmark.') @@ -257,9 +257,9 @@ argp.add_argument('--remote_worker_host', args = argp.parse_args() -languages = set(config.LANGUAGES[l] +languages = set(scenario_config.LANGUAGES[l] for l in itertools.chain.from_iterable( - config.LANGUAGES.iterkeys() if x == 'all' else [x] + scenario_config.LANGUAGES.iterkeys() if x == 'all' else [x] for x in args.language)) # Put together set of remote hosts where to run and build From 6dd74fcb91557d308bc3a143cfb7a651911342e2 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 4 Apr 2016 14:16:10 -0700 Subject: [PATCH 7/9] always build qps_driver --- tools/run_tests/performance/build_performance.sh | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tools/run_tests/performance/build_performance.sh b/tools/run_tests/performance/build_performance.sh index 829c2e30405..7cb806e0bcc 100755 --- a/tools/run_tests/performance/build_performance.sh +++ b/tools/run_tests/performance/build_performance.sh @@ -36,16 +36,17 @@ cd $(dirname $0)/../../.. CONFIG=${CONFIG:-opt} +# build C++ qps worker & driver always - we need at least the driver to +# run any of the scenarios. +# TODO(jtattermusch): not embedding OpenSSL breaks the C# build because +# grpc_csharp_ext needs OpenSSL embedded and some intermediate files from +# this build will be reused. +make CONFIG=${CONFIG} EMBED_OPENSSL=true EMBED_ZLIB=true qps_worker qps_driver -j8 + for language in $@ do - if [ "$language" == "c++" ] + if [ "$language" != "c++" ] then - # build C++ qps worker & driver - # TODO(jtattermusch): not embedding OpenSSL breaks the C# build because - # grpc_csharp_ext needs OpenSSL embedded and some intermediate files from - # this build will be reused. - make CONFIG=${CONFIG} EMBED_OPENSSL=true EMBED_ZLIB=true qps_worker qps_driver -j8 - else tools/run_tests/run_tests.py -l $language -c $CONFIG --build_only -j 8 fi done From e81adf3b7b61ff3b0f8918c085fcd7004b2ed53d Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 4 Apr 2016 15:59:50 -0700 Subject: [PATCH 8/9] Add more features for C# performance worker --- .../Grpc.IntegrationTesting/ClientRunners.cs | 123 +++++++++----- .../Grpc.IntegrationTesting.csproj | 1 + .../InterarrivalTimers.cs | 150 ++++++++++++++++++ 3 files changed, 235 insertions(+), 39 deletions(-) create mode 100644 src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index 0bcacf76e50..f954ca5f34c 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -61,15 +61,7 @@ namespace Grpc.IntegrationTesting public static IClientRunner CreateStarted(ClientConfig config) { Logger.Debug("ClientConfig: {0}", config); - string target = config.ServerTargets.Single(); - GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop, - "Only closed loop scenario supported for C#"); - GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1"); - if (config.OutstandingRpcsPerChannel != 0) - { - Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value"); - } if (config.AsyncClientThreads != 0) { Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value"); @@ -83,22 +75,40 @@ namespace Grpc.IntegrationTesting Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value"); } - var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure; + var channels = CreateChannels(config.ClientChannels, config.ServerTargets, config.SecurityParams); + + return new ClientRunnerImpl(channels, + config.ClientType, + config.RpcType, + config.OutstandingRpcsPerChannel, + config.LoadParams, + config.PayloadConfig, + config.HistogramParams); + } + + private static List CreateChannels(int clientChannels, IEnumerable serverTargets, SecurityParams securityParams) + { + GrpcPreconditions.CheckArgument(clientChannels > 0, "clientChannels needs to be at least 1."); + GrpcPreconditions.CheckArgument(serverTargets.Count() > 0, "at least one serverTarget needs to be specified."); + + var credentials = securityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure; List channelOptions = null; - if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "") + if (securityParams != null && securityParams.ServerHostOverride != "") { channelOptions = new List { - new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride) + new ChannelOption(ChannelOptions.SslTargetNameOverride, securityParams.ServerHostOverride) }; } - var channel = new Channel(target, credentials, channelOptions); - return new ClientRunnerImpl(channel, - config.ClientType, - config.RpcType, - config.PayloadConfig, - config.HistogramParams); + var result = new List(); + for (int i = 0; i < clientChannels; i++) + { + var target = serverTargets.ElementAt(i % serverTargets.Count()); + var channel = new Channel(target, credentials, channelOptions); + result.Add(channel); + } + return result; } } @@ -106,30 +116,35 @@ namespace Grpc.IntegrationTesting { const double SecondsToNanos = 1e9; - readonly Channel channel; + readonly List channels; readonly ClientType clientType; readonly RpcType rpcType; readonly PayloadConfig payloadConfig; readonly Histogram histogram; - readonly BenchmarkService.BenchmarkServiceClient client; - readonly Task runnerTask; - readonly CancellationTokenSource stoppedCts; + readonly List runnerTasks; + readonly CancellationTokenSource stoppedCts = new CancellationTokenSource(); readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); - public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) + public ClientRunnerImpl(List channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams) { - this.channel = GrpcPreconditions.CheckNotNull(channel); + GrpcPreconditions.CheckArgument(outstandingRpcsPerChannel > 0, "outstandingRpcsPerChannel"); + this.channels = new List(channels); this.clientType = clientType; this.rpcType = rpcType; this.payloadConfig = payloadConfig; this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); - this.stoppedCts = new CancellationTokenSource(); - this.client = BenchmarkService.NewClient(channel); - - var threadBody = GetThreadBody(); - this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning); + this.runnerTasks = new List(); + foreach (var channel in this.channels) + { + for (int i = 0; i < outstandingRpcsPerChannel; i++) + { + var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel); + var threadBody = GetThreadBody(channel, timer); + this.runnerTasks.Add(Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning)); + } + } } public ClientStats GetStats(bool reset) @@ -150,12 +165,19 @@ namespace Grpc.IntegrationTesting public async Task StopAsync() { stoppedCts.Cancel(); - await runnerTask; - await channel.ShutdownAsync(); + foreach (var runnerTask in runnerTasks) + { + await runnerTask; + } + foreach (var channel in channels) + { + await channel.ShutdownAsync(); + } } - private void RunClosedLoopUnary() + private void RunUnary(Channel channel, IInterarrivalTimer timer) { + var client = BenchmarkService.NewClient(channel); var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); @@ -167,11 +189,14 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + timer.WaitForNext(); } } - private async Task RunClosedLoopUnaryAsync() + private async Task RunUnaryAsync(Channel channel, IInterarrivalTimer timer) { + var client = BenchmarkService.NewClient(channel); var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); @@ -183,11 +208,14 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + await timer.WaitForNextAsync(); } } - private async Task RunClosedLoopStreamingAsync() + private async Task RunStreamingPingPongAsync(Channel channel, IInterarrivalTimer timer) { + var client = BenchmarkService.NewClient(channel); var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); @@ -202,6 +230,8 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + await timer.WaitForNextAsync(); } // finish the streaming call @@ -210,7 +240,7 @@ namespace Grpc.IntegrationTesting } } - private async Task RunGenericClosedLoopStreamingAsync() + private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer) { var request = CreateByteBufferRequest(); var stopwatch = new Stopwatch(); @@ -228,6 +258,8 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + await timer.WaitForNextAsync(); } // finish the streaming call @@ -236,7 +268,7 @@ namespace Grpc.IntegrationTesting } } - private Action GetThreadBody() + private Action GetThreadBody(Channel channel, IInterarrivalTimer timer) { if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams) { @@ -244,7 +276,7 @@ namespace Grpc.IntegrationTesting GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls"); return () => { - RunGenericClosedLoopStreamingAsync().Wait(); + RunGenericStreamingAsync(channel, timer).Wait(); }; } @@ -252,7 +284,7 @@ namespace Grpc.IntegrationTesting if (clientType == ClientType.SYNC_CLIENT) { GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#"); - return RunClosedLoopUnary; + return () => RunUnary(channel, timer); } else if (clientType == ClientType.ASYNC_CLIENT) { @@ -261,12 +293,12 @@ namespace Grpc.IntegrationTesting case RpcType.UNARY: return () => { - RunClosedLoopUnaryAsync().Wait(); + RunUnaryAsync(channel, timer).Wait(); }; case RpcType.STREAMING: return () => { - RunClosedLoopStreamingAsync().Wait(); + RunStreamingPingPongAsync(channel, timer).Wait(); }; } } @@ -292,5 +324,18 @@ namespace Grpc.IntegrationTesting { return new Payload { Body = ByteString.CopyFrom(new byte[size]) }; } + + private static IInterarrivalTimer CreateTimer(LoadParams loadParams, double loadMultiplier) + { + switch (loadParams.LoadCase) + { + case LoadParams.LoadOneofCase.ClosedLoop: + return new ClosedLoopInterarrivalTimer(); + case LoadParams.LoadOneofCase.Poisson: + return new PoissonInterarrivalTimer(loadParams.Poisson.OfferedLoad * loadMultiplier); + default: + throw new ArgumentException("Unknown load type"); + } + } } } diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 64d14b0df55..7ea80b11f0d 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -115,6 +115,7 @@ + diff --git a/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs b/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs new file mode 100644 index 00000000000..e8d7cbe8bb2 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs @@ -0,0 +1,150 @@ +#region Copyright notice and license + +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using Google.Protobuf; +using Grpc.Core; +using Grpc.Core.Utils; +using Grpc.Testing; + +namespace Grpc.IntegrationTesting +{ + public interface IInterarrivalTimer + { + void WaitForNext(); + + Task WaitForNextAsync(); + } + + /// + /// Interarrival timer that doesn't wait at all. + /// + public class ClosedLoopInterarrivalTimer : IInterarrivalTimer + { + public ClosedLoopInterarrivalTimer() + { + } + + public void WaitForNext() + { + // NOP + } + + public Task WaitForNextAsync() + { + return Task.FromResult(null); + } + } + + /// + /// Interarrival timer that generates Poisson process load. + /// + public class PoissonInterarrivalTimer : IInterarrivalTimer + { + const double NanosToSeconds = 1e-9; + + readonly ExponentialDistribution exponentialDistribution; + DateTime? lastEventTime; + + public PoissonInterarrivalTimer(double offeredLoad) + { + this.exponentialDistribution = new ExponentialDistribution(new Random(), offeredLoad); + this.lastEventTime = DateTime.UtcNow; + } + + public void WaitForNext() + { + var waitDuration = GetNextWaitDuration(); + int millisTimeout = (int) Math.Round(waitDuration.TotalMilliseconds); + if (millisTimeout > 0) + { + // TODO(jtattermusch): probably only works well for a relatively low interarrival rate + Thread.Sleep(millisTimeout); + } + } + + public async Task WaitForNextAsync() + { + var waitDuration = GetNextWaitDuration(); + int millisTimeout = (int) Math.Round(waitDuration.TotalMilliseconds); + if (millisTimeout > 0) + { + // TODO(jtattermusch): probably only works well for a relatively low interarrival rate + await Task.Delay(millisTimeout); + } + } + + private TimeSpan GetNextWaitDuration() + { + if (!lastEventTime.HasValue) + { + this.lastEventTime = DateTime.Now; + } + + var origLastEventTime = this.lastEventTime.Value; + this.lastEventTime = origLastEventTime + TimeSpan.FromSeconds(exponentialDistribution.Next() * NanosToSeconds); + return this.lastEventTime.Value - origLastEventTime; + } + + /// + /// Exp generator. + /// + private class ExponentialDistribution + { + readonly Random random; + readonly double lambda; + readonly double lambdaReciprocal; + + public ExponentialDistribution(Random random, double lambda) + { + this.random = random; + this.lambda = lambda; + this.lambdaReciprocal = 1.0 / lambda; + } + + public double Next() + { + double uniform = random.NextDouble(); + // Use 1.0-uni above to avoid NaN if uni is 0 + return lambdaReciprocal * (-Math.Log(1.0 - uniform)); + } + } + } +} From eff9cf0d6794203b17d17d129a311ccf9fb9c33f Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 5 Apr 2016 10:37:35 -0700 Subject: [PATCH 9/9] make interarrival timer generate the right QPS with poisson load --- src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs b/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs index e8d7cbe8bb2..6492d34890d 100644 --- a/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs +++ b/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs @@ -78,8 +78,6 @@ namespace Grpc.IntegrationTesting /// public class PoissonInterarrivalTimer : IInterarrivalTimer { - const double NanosToSeconds = 1e-9; - readonly ExponentialDistribution exponentialDistribution; DateTime? lastEventTime; @@ -119,7 +117,7 @@ namespace Grpc.IntegrationTesting } var origLastEventTime = this.lastEventTime.Value; - this.lastEventTime = origLastEventTime + TimeSpan.FromSeconds(exponentialDistribution.Next() * NanosToSeconds); + this.lastEventTime = origLastEventTime + TimeSpan.FromSeconds(exponentialDistribution.Next()); return this.lastEventTime.Value - origLastEventTime; }