diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 3eb155ef95c..3fae443681b 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -46,6 +46,7 @@ DEFINE_string( "all : all test cases;\n" "cancel_after_begin : cancel stream after starting it;\n" "cancel_after_first_response: cancel on first response;\n" + "channel_soak: sends 1000 rpcs, tearing down the channel each time;\n" "client_compressed_streaming : compressed request streaming with " "client_compressed_unary : single compressed request;\n" "client_streaming : request streaming with single response;\n" @@ -60,6 +61,7 @@ DEFINE_string( "per_rpc_creds: raw oauth2 access token on a single rpc;\n" "ping_pong : full-duplex streaming;\n" "response streaming;\n" + "rpc_soak: sends 1000 large_unary rpcs;\n" "server_compressed_streaming : single request with compressed " "server_compressed_unary : single compressed response;\n" "server_streaming : single request with response streaming;\n" @@ -91,8 +93,9 @@ int main(int argc, char** argv) { grpc::testing::InitTest(&argc, &argv, true); gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str()); int ret = 0; - grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case), - true, + grpc::testing::ChannelCreationFunc channel_creation_func = + std::bind(&CreateChannelForTestCase, FLAGS_test_case); + grpc::testing::InteropClient client(channel_creation_func, true, FLAGS_do_not_abort_on_transient_failures); std::unordered_map> actions; @@ -151,6 +154,10 @@ int main(int argc, char** argv) { std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client); actions["cacheable_unary"] = std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client); + actions["channel_soak"] = + std::bind(&grpc::testing::InteropClient::DoChannelSoakTest, &client); + actions["rpc_soak"] = + std::bind(&grpc::testing::InteropClient::DoRpcSoakTest, &client); UpdateActions(&actions); diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index fce99a1697b..a023e8776bf 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -74,13 +74,15 @@ void UnaryCompressionChecks(const InteropClientContextInspector& inspector, } } // namespace -InteropClient::ServiceStub::ServiceStub(const std::shared_ptr& channel, - bool new_stub_every_call) - : channel_(channel), new_stub_every_call_(new_stub_every_call) { +InteropClient::ServiceStub::ServiceStub( + ChannelCreationFunc channel_creation_func, bool new_stub_every_call) + : channel_creation_func_(channel_creation_func), + channel_(channel_creation_func_()), + new_stub_every_call_(new_stub_every_call) { // If new_stub_every_call is false, then this is our chance to initialize // stub_. (see Get()) if (!new_stub_every_call) { - stub_ = TestService::NewStub(channel); + stub_ = TestService::NewStub(channel_); } } @@ -100,27 +102,19 @@ InteropClient::ServiceStub::GetUnimplementedServiceStub() { return unimplemented_service_stub_.get(); } -void InteropClient::ServiceStub::Reset( - const std::shared_ptr& channel) { - channel_ = channel; - - // Update stub_ as well. Note: If new_stub_every_call_ is true, we can reset - // the stub_ since the next call to Get() will create a new stub - if (new_stub_every_call_) { - stub_.reset(); +void InteropClient::ServiceStub::ResetChannel() { + channel_ = channel_creation_func_(); + if (!new_stub_every_call_) { + stub_ = TestService::NewStub(channel_); } else { - stub_ = TestService::NewStub(channel); + stub_.reset(); } } -void InteropClient::Reset(const std::shared_ptr& channel) { - serviceStub_.Reset(std::move(channel)); -} - -InteropClient::InteropClient(const std::shared_ptr& channel, +InteropClient::InteropClient(ChannelCreationFunc channel_creation_func, bool new_stub_every_test_case, bool do_not_abort_on_transient_failures) - : serviceStub_(std::move(channel), new_stub_every_test_case), + : serviceStub_(channel_creation_func, new_stub_every_test_case), do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {} bool InteropClient::AssertStatusOk(const Status& s, @@ -1028,6 +1022,34 @@ bool InteropClient::DoCustomMetadata() { return true; } +bool InteropClient::DoRpcSoakTest() { + gpr_log(GPR_DEBUG, "Sending 1000 RPCs..."); + SimpleRequest request; + SimpleResponse response; + for (int i = 0; i < 1000; ++i) { + if (!PerformLargeUnary(&request, &response)) { + return false; + } + } + gpr_log(GPR_DEBUG, "rpc_soak test done."); + return true; +} + +bool InteropClient::DoChannelSoakTest() { + gpr_log(GPR_DEBUG, + "Sending 1000 RPCs, tearing down the channel each time..."); + SimpleRequest request; + SimpleResponse response; + for (int i = 0; i < 1000; ++i) { + serviceStub_.ResetChannel(); + if (!PerformLargeUnary(&request, &response)) { + return false; + } + } + gpr_log(GPR_DEBUG, "channel_soak test done."); + return true; +} + bool InteropClient::DoUnimplementedService() { gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service..."); diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 480eb3f4b62..3033d7b4140 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -34,13 +34,15 @@ typedef std::function CheckerFn; +typedef std::function(void)> ChannelCreationFunc; + class InteropClient { public: /// If new_stub_every_test_case is true, a new TestService::Stub object is /// created for every test case /// If do_not_abort_on_transient_failures is true, abort() is not called in /// case of transient failures (like connection failures) - explicit InteropClient(const std::shared_ptr& channel, + explicit InteropClient(ChannelCreationFunc channel_creation_func, bool new_stub_every_test_case, bool do_not_abort_on_transient_failures); ~InteropClient() {} @@ -67,6 +69,14 @@ class InteropClient { bool DoUnimplementedMethod(); bool DoUnimplementedService(); bool DoCacheableUnary(); + + // The following interop test are not yet part of the interop spec, and are + // not implemented cross-language. They are considered experimental for now, + // but at some point in the future, might be codified and implemented in all + // languages + bool DoChannelSoakTest(); + bool DoRpcSoakTest(); + // Auth tests. // username is a string containing the user email bool DoJwtTokenCreds(const grpc::string& username); @@ -83,15 +93,17 @@ class InteropClient { public: // If new_stub_every_call = true, pointer to a new instance of // TestServce::Stub is returned by Get() everytime it is called - ServiceStub(const std::shared_ptr& channel, + ServiceStub(ChannelCreationFunc channel_creation_func, bool new_stub_every_call); TestService::Stub* Get(); UnimplementedService::Stub* GetUnimplementedServiceStub(); - void Reset(const std::shared_ptr& channel); + // forces channel to be recreated. + void ResetChannel(); private: + ChannelCreationFunc channel_creation_func_; std::unique_ptr stub_; std::unique_ptr unimplemented_service_stub_; std::shared_ptr channel_; @@ -109,8 +121,8 @@ class InteropClient { bool AssertStatusCode(const Status& s, StatusCode expected_code, const grpc::string& optional_debug_string); bool TransientFailureOrAbort(); - ServiceStub serviceStub_; + ServiceStub serviceStub_; /// If true, abort() is not called for transient failures bool do_not_abort_on_transient_failures_; }; diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc index 9d373c3cd9e..7dc1956f783 100644 --- a/test/cpp/interop/stress_interop_client.cc +++ b/test/cpp/interop/stress_interop_client.cc @@ -68,13 +68,13 @@ TestCaseType WeightedRandomTestSelector::GetNextTest() const { StressTestInteropClient::StressTestInteropClient( int test_id, const grpc::string& server_address, - const std::shared_ptr& channel, + ChannelCreationFunc channel_creation_func, const WeightedRandomTestSelector& test_selector, long test_duration_secs, long sleep_duration_ms, bool do_not_abort_on_transient_failures) : test_id_(test_id), server_address_(server_address), - channel_(channel), - interop_client_(new InteropClient(channel, false, + channel_creation_func_(channel_creation_func), + interop_client_(new InteropClient(channel_creation_func_, false, do_not_abort_on_transient_failures)), test_selector_(test_selector), test_duration_secs_(test_duration_secs), diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h index e4fa7d09737..58680d80935 100644 --- a/test/cpp/interop/stress_interop_client.h +++ b/test/cpp/interop/stress_interop_client.h @@ -91,7 +91,7 @@ class WeightedRandomTestSelector { class StressTestInteropClient { public: StressTestInteropClient(int test_id, const grpc::string& server_address, - const std::shared_ptr& channel, + ChannelCreationFunc channel_creation_func, const WeightedRandomTestSelector& test_selector, long test_duration_secs, long sleep_duration_ms, bool do_not_abort_on_transient_failures); @@ -105,7 +105,7 @@ class StressTestInteropClient { int test_id_; const grpc::string& server_address_; - std::shared_ptr channel_; + ChannelCreationFunc channel_creation_func_; std::unique_ptr interop_client_; const WeightedRandomTestSelector& test_selector_; long test_duration_secs_; diff --git a/test/cpp/interop/stress_test.cc b/test/cpp/interop/stress_test.cc index 023e0c8f0b2..ebbd14beba2 100644 --- a/test/cpp/interop/stress_test.cc +++ b/test/cpp/interop/stress_test.cc @@ -283,15 +283,20 @@ int main(int argc, char** argv) { channel_idx++) { gpr_log(GPR_INFO, "Starting test with %s channel_idx=%d..", it->c_str(), channel_idx); - std::shared_ptr channel = grpc::CreateTestChannel( + grpc::testing::ChannelCreationFunc channel_creation_func = std::bind( + static_cast (*)( + const grpc::string&, const grpc::string&, + grpc::testing::transport_security, bool)>( + grpc::CreateTestChannel), *it, FLAGS_server_host_override, security_type, !FLAGS_use_test_ca); // Create stub(s) for each channel for (int stub_idx = 0; stub_idx < FLAGS_num_stubs_per_channel; stub_idx++) { clients.emplace_back(new StressTestInteropClient( - ++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs, - FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures)); + ++thread_idx, *it, channel_creation_func, test_selector, + FLAGS_test_duration_secs, FLAGS_sleep_duration_ms, + FLAGS_do_not_abort_on_transient_failures)); bool is_already_created = false; // QpsGauge name