From b6597b4fbde40b61200d363a195239e19e0b599c Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 17 Jul 2018 16:36:59 -0700 Subject: [PATCH 1/3] Add two new soak interop tests --- test/cpp/interop/client.cc | 11 ++++- test/cpp/interop/interop_client.cc | 60 ++++++++++++++++------- test/cpp/interop/interop_client.h | 20 ++++++-- test/cpp/interop/stress_interop_client.cc | 6 +-- test/cpp/interop/stress_interop_client.h | 4 +- test/cpp/interop/stress_test.cc | 11 +++-- 6 files changed, 79 insertions(+), 33 deletions(-) diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 3eb155ef95c..3fae443681b 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -46,6 +46,7 @@ DEFINE_string( "all : all test cases;\n" "cancel_after_begin : cancel stream after starting it;\n" "cancel_after_first_response: cancel on first response;\n" + "channel_soak: sends 1000 rpcs, tearing down the channel each time;\n" "client_compressed_streaming : compressed request streaming with " "client_compressed_unary : single compressed request;\n" "client_streaming : request streaming with single response;\n" @@ -60,6 +61,7 @@ DEFINE_string( "per_rpc_creds: raw oauth2 access token on a single rpc;\n" "ping_pong : full-duplex streaming;\n" "response streaming;\n" + "rpc_soak: sends 1000 large_unary rpcs;\n" "server_compressed_streaming : single request with compressed " "server_compressed_unary : single compressed response;\n" "server_streaming : single request with response streaming;\n" @@ -91,8 +93,9 @@ int main(int argc, char** argv) { grpc::testing::InitTest(&argc, &argv, true); gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str()); int ret = 0; - grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case), - true, + grpc::testing::ChannelCreationFunc channel_creation_func = + std::bind(&CreateChannelForTestCase, FLAGS_test_case); + grpc::testing::InteropClient client(channel_creation_func, true, FLAGS_do_not_abort_on_transient_failures); std::unordered_map> actions; @@ -151,6 +154,10 @@ int main(int argc, char** argv) { std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client); actions["cacheable_unary"] = std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client); + actions["channel_soak"] = + std::bind(&grpc::testing::InteropClient::DoChannelSoakTest, &client); + actions["rpc_soak"] = + std::bind(&grpc::testing::InteropClient::DoRpcSoakTest, &client); UpdateActions(&actions); diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index fce99a1697b..a023e8776bf 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -74,13 +74,15 @@ void UnaryCompressionChecks(const InteropClientContextInspector& inspector, } } // namespace -InteropClient::ServiceStub::ServiceStub(const std::shared_ptr& channel, - bool new_stub_every_call) - : channel_(channel), new_stub_every_call_(new_stub_every_call) { +InteropClient::ServiceStub::ServiceStub( + ChannelCreationFunc channel_creation_func, bool new_stub_every_call) + : channel_creation_func_(channel_creation_func), + channel_(channel_creation_func_()), + new_stub_every_call_(new_stub_every_call) { // If new_stub_every_call is false, then this is our chance to initialize // stub_. (see Get()) if (!new_stub_every_call) { - stub_ = TestService::NewStub(channel); + stub_ = TestService::NewStub(channel_); } } @@ -100,27 +102,19 @@ InteropClient::ServiceStub::GetUnimplementedServiceStub() { return unimplemented_service_stub_.get(); } -void InteropClient::ServiceStub::Reset( - const std::shared_ptr& channel) { - channel_ = channel; - - // Update stub_ as well. Note: If new_stub_every_call_ is true, we can reset - // the stub_ since the next call to Get() will create a new stub - if (new_stub_every_call_) { - stub_.reset(); +void InteropClient::ServiceStub::ResetChannel() { + channel_ = channel_creation_func_(); + if (!new_stub_every_call_) { + stub_ = TestService::NewStub(channel_); } else { - stub_ = TestService::NewStub(channel); + stub_.reset(); } } -void InteropClient::Reset(const std::shared_ptr& channel) { - serviceStub_.Reset(std::move(channel)); -} - -InteropClient::InteropClient(const std::shared_ptr& channel, +InteropClient::InteropClient(ChannelCreationFunc channel_creation_func, bool new_stub_every_test_case, bool do_not_abort_on_transient_failures) - : serviceStub_(std::move(channel), new_stub_every_test_case), + : serviceStub_(channel_creation_func, new_stub_every_test_case), do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {} bool InteropClient::AssertStatusOk(const Status& s, @@ -1028,6 +1022,34 @@ bool InteropClient::DoCustomMetadata() { return true; } +bool InteropClient::DoRpcSoakTest() { + gpr_log(GPR_DEBUG, "Sending 1000 RPCs..."); + SimpleRequest request; + SimpleResponse response; + for (int i = 0; i < 1000; ++i) { + if (!PerformLargeUnary(&request, &response)) { + return false; + } + } + gpr_log(GPR_DEBUG, "rpc_soak test done."); + return true; +} + +bool InteropClient::DoChannelSoakTest() { + gpr_log(GPR_DEBUG, + "Sending 1000 RPCs, tearing down the channel each time..."); + SimpleRequest request; + SimpleResponse response; + for (int i = 0; i < 1000; ++i) { + serviceStub_.ResetChannel(); + if (!PerformLargeUnary(&request, &response)) { + return false; + } + } + gpr_log(GPR_DEBUG, "channel_soak test done."); + return true; +} + bool InteropClient::DoUnimplementedService() { gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service..."); diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 480eb3f4b62..3033d7b4140 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -34,13 +34,15 @@ typedef std::function CheckerFn; +typedef std::function(void)> ChannelCreationFunc; + class InteropClient { public: /// If new_stub_every_test_case is true, a new TestService::Stub object is /// created for every test case /// If do_not_abort_on_transient_failures is true, abort() is not called in /// case of transient failures (like connection failures) - explicit InteropClient(const std::shared_ptr& channel, + explicit InteropClient(ChannelCreationFunc channel_creation_func, bool new_stub_every_test_case, bool do_not_abort_on_transient_failures); ~InteropClient() {} @@ -67,6 +69,14 @@ class InteropClient { bool DoUnimplementedMethod(); bool DoUnimplementedService(); bool DoCacheableUnary(); + + // The following interop test are not yet part of the interop spec, and are + // not implemented cross-language. They are considered experimental for now, + // but at some point in the future, might be codified and implemented in all + // languages + bool DoChannelSoakTest(); + bool DoRpcSoakTest(); + // Auth tests. // username is a string containing the user email bool DoJwtTokenCreds(const grpc::string& username); @@ -83,15 +93,17 @@ class InteropClient { public: // If new_stub_every_call = true, pointer to a new instance of // TestServce::Stub is returned by Get() everytime it is called - ServiceStub(const std::shared_ptr& channel, + ServiceStub(ChannelCreationFunc channel_creation_func, bool new_stub_every_call); TestService::Stub* Get(); UnimplementedService::Stub* GetUnimplementedServiceStub(); - void Reset(const std::shared_ptr& channel); + // forces channel to be recreated. + void ResetChannel(); private: + ChannelCreationFunc channel_creation_func_; std::unique_ptr stub_; std::unique_ptr unimplemented_service_stub_; std::shared_ptr channel_; @@ -109,8 +121,8 @@ class InteropClient { bool AssertStatusCode(const Status& s, StatusCode expected_code, const grpc::string& optional_debug_string); bool TransientFailureOrAbort(); - ServiceStub serviceStub_; + ServiceStub serviceStub_; /// If true, abort() is not called for transient failures bool do_not_abort_on_transient_failures_; }; diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc index 9d373c3cd9e..7dc1956f783 100644 --- a/test/cpp/interop/stress_interop_client.cc +++ b/test/cpp/interop/stress_interop_client.cc @@ -68,13 +68,13 @@ TestCaseType WeightedRandomTestSelector::GetNextTest() const { StressTestInteropClient::StressTestInteropClient( int test_id, const grpc::string& server_address, - const std::shared_ptr& channel, + ChannelCreationFunc channel_creation_func, const WeightedRandomTestSelector& test_selector, long test_duration_secs, long sleep_duration_ms, bool do_not_abort_on_transient_failures) : test_id_(test_id), server_address_(server_address), - channel_(channel), - interop_client_(new InteropClient(channel, false, + channel_creation_func_(channel_creation_func), + interop_client_(new InteropClient(channel_creation_func_, false, do_not_abort_on_transient_failures)), test_selector_(test_selector), test_duration_secs_(test_duration_secs), diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h index e4fa7d09737..58680d80935 100644 --- a/test/cpp/interop/stress_interop_client.h +++ b/test/cpp/interop/stress_interop_client.h @@ -91,7 +91,7 @@ class WeightedRandomTestSelector { class StressTestInteropClient { public: StressTestInteropClient(int test_id, const grpc::string& server_address, - const std::shared_ptr& channel, + ChannelCreationFunc channel_creation_func, const WeightedRandomTestSelector& test_selector, long test_duration_secs, long sleep_duration_ms, bool do_not_abort_on_transient_failures); @@ -105,7 +105,7 @@ class StressTestInteropClient { int test_id_; const grpc::string& server_address_; - std::shared_ptr channel_; + ChannelCreationFunc channel_creation_func_; std::unique_ptr interop_client_; const WeightedRandomTestSelector& test_selector_; long test_duration_secs_; diff --git a/test/cpp/interop/stress_test.cc b/test/cpp/interop/stress_test.cc index 023e0c8f0b2..ebbd14beba2 100644 --- a/test/cpp/interop/stress_test.cc +++ b/test/cpp/interop/stress_test.cc @@ -283,15 +283,20 @@ int main(int argc, char** argv) { channel_idx++) { gpr_log(GPR_INFO, "Starting test with %s channel_idx=%d..", it->c_str(), channel_idx); - std::shared_ptr channel = grpc::CreateTestChannel( + grpc::testing::ChannelCreationFunc channel_creation_func = std::bind( + static_cast (*)( + const grpc::string&, const grpc::string&, + grpc::testing::transport_security, bool)>( + grpc::CreateTestChannel), *it, FLAGS_server_host_override, security_type, !FLAGS_use_test_ca); // Create stub(s) for each channel for (int stub_idx = 0; stub_idx < FLAGS_num_stubs_per_channel; stub_idx++) { clients.emplace_back(new StressTestInteropClient( - ++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs, - FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures)); + ++thread_idx, *it, channel_creation_func, test_selector, + FLAGS_test_duration_secs, FLAGS_sleep_duration_ms, + FLAGS_do_not_abort_on_transient_failures)); bool is_already_created = false; // QpsGauge name From 12f1cc059b037e3853185d3e740ca2b16f1f5495 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Wed, 18 Jul 2018 07:33:08 -0700 Subject: [PATCH 2/3] Reviewer feedback --- test/cpp/interop/client.cc | 15 ++++++++++----- test/cpp/interop/interop_client.cc | 20 +++++++++++--------- test/cpp/interop/interop_client.h | 4 ++-- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 3fae443681b..7bcf23c0eb6 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -46,7 +46,7 @@ DEFINE_string( "all : all test cases;\n" "cancel_after_begin : cancel stream after starting it;\n" "cancel_after_first_response: cancel on first response;\n" - "channel_soak: sends 1000 rpcs, tearing down the channel each time;\n" + "channel_soak: sends 'soak_iterations' rpcs, rebuilds channel each time;\n" "client_compressed_streaming : compressed request streaming with " "client_compressed_unary : single compressed request;\n" "client_streaming : request streaming with single response;\n" @@ -61,7 +61,7 @@ DEFINE_string( "per_rpc_creds: raw oauth2 access token on a single rpc;\n" "ping_pong : full-duplex streaming;\n" "response streaming;\n" - "rpc_soak: sends 1000 large_unary rpcs;\n" + "rpc_soak: 'sends soak_iterations' large_unary rpcs;\n" "server_compressed_streaming : single request with compressed " "server_compressed_unary : single compressed response;\n" "server_streaming : single request with response streaming;\n" @@ -85,6 +85,10 @@ DEFINE_bool(do_not_abort_on_transient_failures, false, "test is retried in case of transient failures (and currently the " "interop tests are not retried even if this flag is set to true)"); +DEFINE_int32(soak_iterations, 1000, + "number of iterations to use for the two soak tests; rpc_soak and " + "channel_soak"); + using grpc::testing::CreateChannelForTestCase; using grpc::testing::GetServiceAccountJsonKey; using grpc::testing::UpdateActions; @@ -155,9 +159,10 @@ int main(int argc, char** argv) { actions["cacheable_unary"] = std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client); actions["channel_soak"] = - std::bind(&grpc::testing::InteropClient::DoChannelSoakTest, &client); - actions["rpc_soak"] = - std::bind(&grpc::testing::InteropClient::DoRpcSoakTest, &client); + std::bind(&grpc::testing::InteropClient::DoChannelSoakTest, &client, + FLAGS_soak_iterations); + actions["rpc_soak"] = std::bind(&grpc::testing::InteropClient::DoRpcSoakTest, + &client, FLAGS_soak_iterations); UpdateActions(&actions); diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index a023e8776bf..b7ce90803ba 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -106,8 +106,6 @@ void InteropClient::ServiceStub::ResetChannel() { channel_ = channel_creation_func_(); if (!new_stub_every_call_) { stub_ = TestService::NewStub(channel_); - } else { - stub_.reset(); } } @@ -1022,12 +1020,14 @@ bool InteropClient::DoCustomMetadata() { return true; } -bool InteropClient::DoRpcSoakTest() { - gpr_log(GPR_DEBUG, "Sending 1000 RPCs..."); +bool InteropClient::DoRpcSoakTest(int32_t soak_iterations) { + gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations); + GPR_ASSERT(soak_iterations > 0); SimpleRequest request; SimpleResponse response; - for (int i = 0; i < 1000; ++i) { + for (int i = 0; i < soak_iterations; ++i) { if (!PerformLargeUnary(&request, &response)) { + gpr_log(GPR_ERROR, "rpc_soak test failed on iteration %d", i); return false; } } @@ -1035,14 +1035,16 @@ bool InteropClient::DoRpcSoakTest() { return true; } -bool InteropClient::DoChannelSoakTest() { - gpr_log(GPR_DEBUG, - "Sending 1000 RPCs, tearing down the channel each time..."); +bool InteropClient::DoChannelSoakTest(int32_t soak_iterations) { + gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...", + soak_iterations); + GPR_ASSERT(soak_iterations > 0); SimpleRequest request; SimpleResponse response; - for (int i = 0; i < 1000; ++i) { + for (int i = 0; i < soak_iterations; ++i) { serviceStub_.ResetChannel(); if (!PerformLargeUnary(&request, &response)) { + gpr_log(GPR_ERROR, "channel_soak test failed on iteration %d", i); return false; } } diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 3033d7b4140..e5be44d1d47 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -74,8 +74,8 @@ class InteropClient { // not implemented cross-language. They are considered experimental for now, // but at some point in the future, might be codified and implemented in all // languages - bool DoChannelSoakTest(); - bool DoRpcSoakTest(); + bool DoChannelSoakTest(int32_t soak_iterations); + bool DoRpcSoakTest(int32_t soak_iterations); // Auth tests. // username is a string containing the user email From 21fa7905a3881c67fb6dec41023be58df1dee8dd Mon Sep 17 00:00:00 2001 From: ncteisen Date: Wed, 18 Jul 2018 10:44:27 -0700 Subject: [PATCH 3/3] Add description of new experimental interops --- doc/interop-test-descriptions.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/doc/interop-test-descriptions.md b/doc/interop-test-descriptions.md index 0ee2cae2bd6..9781925533f 100644 --- a/doc/interop-test-descriptions.md +++ b/doc/interop-test-descriptions.md @@ -899,6 +899,25 @@ Status: TODO This test verifies that a client sending faster than a server can drain sees pushback (i.e., attempts to send succeed only after appropriate delays). +### Experimental Tests + +These tests are not yet standardized, and are not yet implemented in all +languages. Therefore they are not part of our interop matrix. + +#### rpc_soak + +The client performs many large_unary RPCs in sequence over the same channel. +The number of RPCs is configured by the experimental flag, `soak_iterations`. + +#### channel_soak + +The client performs many large_unary RPCs in sequence. Before each RPC, it +tears down and rebuilds the channel. The number of RPCs is configured by +the experimental flag, `soak_iterations`. + +This tests puts stress on several gRPC components; the resolver, the load +balancer, and the RPC hotpath. + ### TODO Tests #### High priority: