Add two new soak interop tests

pull/16044/head
ncteisen 7 years ago
parent 3904de99b8
commit b6597b4fbd
  1. 11
      test/cpp/interop/client.cc
  2. 60
      test/cpp/interop/interop_client.cc
  3. 20
      test/cpp/interop/interop_client.h
  4. 6
      test/cpp/interop/stress_interop_client.cc
  5. 4
      test/cpp/interop/stress_interop_client.h
  6. 11
      test/cpp/interop/stress_test.cc

@ -46,6 +46,7 @@ DEFINE_string(
"all : all test cases;\n" "all : all test cases;\n"
"cancel_after_begin : cancel stream after starting it;\n" "cancel_after_begin : cancel stream after starting it;\n"
"cancel_after_first_response: cancel on first response;\n" "cancel_after_first_response: cancel on first response;\n"
"channel_soak: sends 1000 rpcs, tearing down the channel each time;\n"
"client_compressed_streaming : compressed request streaming with " "client_compressed_streaming : compressed request streaming with "
"client_compressed_unary : single compressed request;\n" "client_compressed_unary : single compressed request;\n"
"client_streaming : request streaming with single response;\n" "client_streaming : request streaming with single response;\n"
@ -60,6 +61,7 @@ DEFINE_string(
"per_rpc_creds: raw oauth2 access token on a single rpc;\n" "per_rpc_creds: raw oauth2 access token on a single rpc;\n"
"ping_pong : full-duplex streaming;\n" "ping_pong : full-duplex streaming;\n"
"response streaming;\n" "response streaming;\n"
"rpc_soak: sends 1000 large_unary rpcs;\n"
"server_compressed_streaming : single request with compressed " "server_compressed_streaming : single request with compressed "
"server_compressed_unary : single compressed response;\n" "server_compressed_unary : single compressed response;\n"
"server_streaming : single request with response streaming;\n" "server_streaming : single request with response streaming;\n"
@ -91,8 +93,9 @@ int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true); grpc::testing::InitTest(&argc, &argv, true);
gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str()); gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str());
int ret = 0; int ret = 0;
grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case), grpc::testing::ChannelCreationFunc channel_creation_func =
true, std::bind(&CreateChannelForTestCase, FLAGS_test_case);
grpc::testing::InteropClient client(channel_creation_func, true,
FLAGS_do_not_abort_on_transient_failures); FLAGS_do_not_abort_on_transient_failures);
std::unordered_map<grpc::string, std::function<bool()>> actions; std::unordered_map<grpc::string, std::function<bool()>> actions;
@ -151,6 +154,10 @@ int main(int argc, char** argv) {
std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client); std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client);
actions["cacheable_unary"] = actions["cacheable_unary"] =
std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client); std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client);
actions["channel_soak"] =
std::bind(&grpc::testing::InteropClient::DoChannelSoakTest, &client);
actions["rpc_soak"] =
std::bind(&grpc::testing::InteropClient::DoRpcSoakTest, &client);
UpdateActions(&actions); UpdateActions(&actions);

@ -74,13 +74,15 @@ void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
} }
} // namespace } // namespace
InteropClient::ServiceStub::ServiceStub(const std::shared_ptr<Channel>& channel, InteropClient::ServiceStub::ServiceStub(
bool new_stub_every_call) ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
: channel_(channel), new_stub_every_call_(new_stub_every_call) { : channel_creation_func_(channel_creation_func),
channel_(channel_creation_func_()),
new_stub_every_call_(new_stub_every_call) {
// If new_stub_every_call is false, then this is our chance to initialize // If new_stub_every_call is false, then this is our chance to initialize
// stub_. (see Get()) // stub_. (see Get())
if (!new_stub_every_call) { if (!new_stub_every_call) {
stub_ = TestService::NewStub(channel); stub_ = TestService::NewStub(channel_);
} }
} }
@ -100,27 +102,19 @@ InteropClient::ServiceStub::GetUnimplementedServiceStub() {
return unimplemented_service_stub_.get(); return unimplemented_service_stub_.get();
} }
void InteropClient::ServiceStub::Reset( void InteropClient::ServiceStub::ResetChannel() {
const std::shared_ptr<Channel>& channel) { channel_ = channel_creation_func_();
channel_ = channel; if (!new_stub_every_call_) {
stub_ = TestService::NewStub(channel_);
// Update stub_ as well. Note: If new_stub_every_call_ is true, we can reset
// the stub_ since the next call to Get() will create a new stub
if (new_stub_every_call_) {
stub_.reset();
} else { } else {
stub_ = TestService::NewStub(channel); stub_.reset();
}
} }
void InteropClient::Reset(const std::shared_ptr<Channel>& channel) {
serviceStub_.Reset(std::move(channel));
} }
InteropClient::InteropClient(const std::shared_ptr<Channel>& channel, InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
bool new_stub_every_test_case, bool new_stub_every_test_case,
bool do_not_abort_on_transient_failures) bool do_not_abort_on_transient_failures)
: serviceStub_(std::move(channel), new_stub_every_test_case), : serviceStub_(channel_creation_func, new_stub_every_test_case),
do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {} do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
bool InteropClient::AssertStatusOk(const Status& s, bool InteropClient::AssertStatusOk(const Status& s,
@ -1028,6 +1022,34 @@ bool InteropClient::DoCustomMetadata() {
return true; return true;
} }
bool InteropClient::DoRpcSoakTest() {
gpr_log(GPR_DEBUG, "Sending 1000 RPCs...");
SimpleRequest request;
SimpleResponse response;
for (int i = 0; i < 1000; ++i) {
if (!PerformLargeUnary(&request, &response)) {
return false;
}
}
gpr_log(GPR_DEBUG, "rpc_soak test done.");
return true;
}
bool InteropClient::DoChannelSoakTest() {
gpr_log(GPR_DEBUG,
"Sending 1000 RPCs, tearing down the channel each time...");
SimpleRequest request;
SimpleResponse response;
for (int i = 0; i < 1000; ++i) {
serviceStub_.ResetChannel();
if (!PerformLargeUnary(&request, &response)) {
return false;
}
}
gpr_log(GPR_DEBUG, "channel_soak test done.");
return true;
}
bool InteropClient::DoUnimplementedService() { bool InteropClient::DoUnimplementedService() {
gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service..."); gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");

@ -34,13 +34,15 @@ typedef std::function<void(const InteropClientContextInspector&,
const SimpleRequest*, const SimpleResponse*)> const SimpleRequest*, const SimpleResponse*)>
CheckerFn; CheckerFn;
typedef std::function<std::shared_ptr<Channel>(void)> ChannelCreationFunc;
class InteropClient { class InteropClient {
public: public:
/// If new_stub_every_test_case is true, a new TestService::Stub object is /// If new_stub_every_test_case is true, a new TestService::Stub object is
/// created for every test case /// created for every test case
/// If do_not_abort_on_transient_failures is true, abort() is not called in /// If do_not_abort_on_transient_failures is true, abort() is not called in
/// case of transient failures (like connection failures) /// case of transient failures (like connection failures)
explicit InteropClient(const std::shared_ptr<Channel>& channel, explicit InteropClient(ChannelCreationFunc channel_creation_func,
bool new_stub_every_test_case, bool new_stub_every_test_case,
bool do_not_abort_on_transient_failures); bool do_not_abort_on_transient_failures);
~InteropClient() {} ~InteropClient() {}
@ -67,6 +69,14 @@ class InteropClient {
bool DoUnimplementedMethod(); bool DoUnimplementedMethod();
bool DoUnimplementedService(); bool DoUnimplementedService();
bool DoCacheableUnary(); bool DoCacheableUnary();
// The following interop test are not yet part of the interop spec, and are
// not implemented cross-language. They are considered experimental for now,
// but at some point in the future, might be codified and implemented in all
// languages
bool DoChannelSoakTest();
bool DoRpcSoakTest();
// Auth tests. // Auth tests.
// username is a string containing the user email // username is a string containing the user email
bool DoJwtTokenCreds(const grpc::string& username); bool DoJwtTokenCreds(const grpc::string& username);
@ -83,15 +93,17 @@ class InteropClient {
public: public:
// If new_stub_every_call = true, pointer to a new instance of // If new_stub_every_call = true, pointer to a new instance of
// TestServce::Stub is returned by Get() everytime it is called // TestServce::Stub is returned by Get() everytime it is called
ServiceStub(const std::shared_ptr<Channel>& channel, ServiceStub(ChannelCreationFunc channel_creation_func,
bool new_stub_every_call); bool new_stub_every_call);
TestService::Stub* Get(); TestService::Stub* Get();
UnimplementedService::Stub* GetUnimplementedServiceStub(); UnimplementedService::Stub* GetUnimplementedServiceStub();
void Reset(const std::shared_ptr<Channel>& channel); // forces channel to be recreated.
void ResetChannel();
private: private:
ChannelCreationFunc channel_creation_func_;
std::unique_ptr<TestService::Stub> stub_; std::unique_ptr<TestService::Stub> stub_;
std::unique_ptr<UnimplementedService::Stub> unimplemented_service_stub_; std::unique_ptr<UnimplementedService::Stub> unimplemented_service_stub_;
std::shared_ptr<Channel> channel_; std::shared_ptr<Channel> channel_;
@ -109,8 +121,8 @@ class InteropClient {
bool AssertStatusCode(const Status& s, StatusCode expected_code, bool AssertStatusCode(const Status& s, StatusCode expected_code,
const grpc::string& optional_debug_string); const grpc::string& optional_debug_string);
bool TransientFailureOrAbort(); bool TransientFailureOrAbort();
ServiceStub serviceStub_;
ServiceStub serviceStub_;
/// If true, abort() is not called for transient failures /// If true, abort() is not called for transient failures
bool do_not_abort_on_transient_failures_; bool do_not_abort_on_transient_failures_;
}; };

@ -68,13 +68,13 @@ TestCaseType WeightedRandomTestSelector::GetNextTest() const {
StressTestInteropClient::StressTestInteropClient( StressTestInteropClient::StressTestInteropClient(
int test_id, const grpc::string& server_address, int test_id, const grpc::string& server_address,
const std::shared_ptr<Channel>& channel, ChannelCreationFunc channel_creation_func,
const WeightedRandomTestSelector& test_selector, long test_duration_secs, const WeightedRandomTestSelector& test_selector, long test_duration_secs,
long sleep_duration_ms, bool do_not_abort_on_transient_failures) long sleep_duration_ms, bool do_not_abort_on_transient_failures)
: test_id_(test_id), : test_id_(test_id),
server_address_(server_address), server_address_(server_address),
channel_(channel), channel_creation_func_(channel_creation_func),
interop_client_(new InteropClient(channel, false, interop_client_(new InteropClient(channel_creation_func_, false,
do_not_abort_on_transient_failures)), do_not_abort_on_transient_failures)),
test_selector_(test_selector), test_selector_(test_selector),
test_duration_secs_(test_duration_secs), test_duration_secs_(test_duration_secs),

@ -91,7 +91,7 @@ class WeightedRandomTestSelector {
class StressTestInteropClient { class StressTestInteropClient {
public: public:
StressTestInteropClient(int test_id, const grpc::string& server_address, StressTestInteropClient(int test_id, const grpc::string& server_address,
const std::shared_ptr<Channel>& channel, ChannelCreationFunc channel_creation_func,
const WeightedRandomTestSelector& test_selector, const WeightedRandomTestSelector& test_selector,
long test_duration_secs, long sleep_duration_ms, long test_duration_secs, long sleep_duration_ms,
bool do_not_abort_on_transient_failures); bool do_not_abort_on_transient_failures);
@ -105,7 +105,7 @@ class StressTestInteropClient {
int test_id_; int test_id_;
const grpc::string& server_address_; const grpc::string& server_address_;
std::shared_ptr<Channel> channel_; ChannelCreationFunc channel_creation_func_;
std::unique_ptr<InteropClient> interop_client_; std::unique_ptr<InteropClient> interop_client_;
const WeightedRandomTestSelector& test_selector_; const WeightedRandomTestSelector& test_selector_;
long test_duration_secs_; long test_duration_secs_;

@ -283,15 +283,20 @@ int main(int argc, char** argv) {
channel_idx++) { channel_idx++) {
gpr_log(GPR_INFO, "Starting test with %s channel_idx=%d..", it->c_str(), gpr_log(GPR_INFO, "Starting test with %s channel_idx=%d..", it->c_str(),
channel_idx); channel_idx);
std::shared_ptr<grpc::Channel> channel = grpc::CreateTestChannel( grpc::testing::ChannelCreationFunc channel_creation_func = std::bind(
static_cast<std::shared_ptr<grpc::Channel> (*)(
const grpc::string&, const grpc::string&,
grpc::testing::transport_security, bool)>(
grpc::CreateTestChannel),
*it, FLAGS_server_host_override, security_type, !FLAGS_use_test_ca); *it, FLAGS_server_host_override, security_type, !FLAGS_use_test_ca);
// Create stub(s) for each channel // Create stub(s) for each channel
for (int stub_idx = 0; stub_idx < FLAGS_num_stubs_per_channel; for (int stub_idx = 0; stub_idx < FLAGS_num_stubs_per_channel;
stub_idx++) { stub_idx++) {
clients.emplace_back(new StressTestInteropClient( clients.emplace_back(new StressTestInteropClient(
++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs, ++thread_idx, *it, channel_creation_func, test_selector,
FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures)); FLAGS_test_duration_secs, FLAGS_sleep_duration_ms,
FLAGS_do_not_abort_on_transient_failures));
bool is_already_created = false; bool is_already_created = false;
// QpsGauge name // QpsGauge name

Loading…
Cancel
Save