diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc index 04671fb935e..f287a5aa3b4 100644 --- a/test/cpp/interop/stress_interop_client.cc +++ b/test/cpp/interop/stress_interop_client.cc @@ -84,49 +84,37 @@ StressTestInteropClient::StressTestInteropClient( int test_id, const grpc::string& server_address, std::shared_ptr channel, const WeightedRandomTestSelector& test_selector, long test_duration_secs, - long sleep_duration_ms, long metrics_collection_interval_secs) + long sleep_duration_ms) : test_id_(test_id), server_address_(server_address), channel_(channel), interop_client_(new InteropClient(channel, false)), test_selector_(test_selector), test_duration_secs_(test_duration_secs), - sleep_duration_ms_(sleep_duration_ms), - metrics_collection_interval_secs_(metrics_collection_interval_secs) {} + sleep_duration_ms_(sleep_duration_ms) {} -void StressTestInteropClient::MainLoop(std::shared_ptr qps_gauge) { +void StressTestInteropClient::MainLoop(std::shared_ptr qps_gauge) { gpr_log(GPR_INFO, "Running test %d. ServerAddr: %s", test_id_, server_address_.c_str()); - gpr_timespec test_end_time = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(test_duration_secs_, GPR_TIMESPAN)); + gpr_timespec test_end_time; + if (test_duration_secs_ < 0) { + test_end_time = gpr_inf_future(GPR_CLOCK_REALTIME); + } else { + test_end_time = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(test_duration_secs_, GPR_TIMESPAN)); + } - gpr_timespec current_time = gpr_now(GPR_CLOCK_REALTIME); - gpr_timespec next_stat_collection_time = current_time; - gpr_timespec collection_interval = - gpr_time_from_seconds(metrics_collection_interval_secs_, GPR_TIMESPAN); - long num_calls_per_interval = 0; + qps_gauge->Reset(); - while (test_duration_secs_ < 0 || - gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), test_end_time) < 0) { + while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), test_end_time) < 0) { // Select the test case to execute based on the weights and execute it TestCaseType test_case = test_selector_.GetNextTest(); gpr_log(GPR_DEBUG, "%d - Executing the test case %d", test_id_, test_case); RunTest(test_case); - num_calls_per_interval++; - - // See if its time to collect stats yet - current_time = gpr_now(GPR_CLOCK_REALTIME); - if (gpr_time_cmp(next_stat_collection_time, current_time) < 0) { - qps_gauge->Set(num_calls_per_interval / - metrics_collection_interval_secs_); - - num_calls_per_interval = 0; - next_stat_collection_time = - gpr_time_add(current_time, collection_interval); - } + qps_gauge->Incr(); // Sleep between successive calls if needed if (sleep_duration_ms_ > 0) { diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h index 6fd303d6b79..cb0cd988214 100644 --- a/test/cpp/interop/stress_interop_client.h +++ b/test/cpp/interop/stress_interop_client.h @@ -87,12 +87,11 @@ class StressTestInteropClient { StressTestInteropClient(int test_id, const grpc::string& server_address, std::shared_ptr channel, const WeightedRandomTestSelector& test_selector, - long test_duration_secs, long sleep_duration_ms, - long metrics_collection_interval_secs); + long test_duration_secs, long sleep_duration_ms); // The main function. Use this as the thread entry point. - // qps_gauge is the Gauge to record the requests per second metric - void MainLoop(std::shared_ptr qps_gauge); + // qps_gauge is the QpsGauge to record the requests per second metric + void MainLoop(std::shared_ptr qps_gauge); private: void RunTest(TestCaseType test_case); @@ -104,7 +103,6 @@ class StressTestInteropClient { const WeightedRandomTestSelector& test_selector_; long test_duration_secs_; long sleep_duration_ms_; - long metrics_collection_interval_secs_; }; } // namespace testing diff --git a/test/cpp/interop/stress_test.cc b/test/cpp/interop/stress_test.cc index 38caf31b76a..d9e3fd25c51 100644 --- a/test/cpp/interop/stress_test.cc +++ b/test/cpp/interop/stress_test.cc @@ -56,9 +56,6 @@ extern void gpr_default_log(gpr_log_func_args* args); DEFINE_int32(metrics_port, 8081, "The metrics server port."); -DEFINE_int32(metrics_collection_interval_secs, 5, - "How often (in seconds) should metrics be recorded."); - DEFINE_int32(sleep_duration_ms, 0, "The duration (in millisec) between two" " consecutive test calls (per server) issued by the server."); @@ -275,19 +272,19 @@ int main(int argc, char** argv) { stub_idx++) { StressTestInteropClient* client = new StressTestInteropClient( ++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs, - FLAGS_sleep_duration_ms, FLAGS_metrics_collection_interval_secs); + FLAGS_sleep_duration_ms); - bool is_already_created; - // Gauge name + bool is_already_created = false; + // QpsGauge name std::snprintf(buffer, sizeof(buffer), "/stress_test/server_%d/channel_%d/stub_%d/qps", server_idx, channel_idx, stub_idx); test_threads.emplace_back(grpc::thread( &StressTestInteropClient::MainLoop, client, - metrics_service.CreateGauge(buffer, &is_already_created))); + metrics_service.CreateQpsGauge(buffer, &is_already_created))); - // The Gauge should not have been already created + // The QpsGauge should not have been already created GPR_ASSERT(!is_already_created); } } diff --git a/test/cpp/util/metrics_server.cc b/test/cpp/util/metrics_server.cc index d9b44a6a925..cc6b39b7532 100644 --- a/test/cpp/util/metrics_server.cc +++ b/test/cpp/util/metrics_server.cc @@ -42,16 +42,26 @@ namespace grpc { namespace testing { -Gauge::Gauge(long initial_val) : val_(initial_val) {} +QpsGauge::QpsGauge() + : start_time_(gpr_now(GPR_CLOCK_REALTIME)), num_queries_(0) {} -void Gauge::Set(long new_val) { - std::lock_guard lock(val_mu_); - val_ = new_val; +void QpsGauge::Reset() { + std::lock_guard lock(num_queries_mu_); + num_queries_ = 0; + start_time_ = gpr_now(GPR_CLOCK_REALTIME); } -long Gauge::Get() { - std::lock_guard lock(val_mu_); - return val_; +void QpsGauge::Incr() { + std::lock_guard lock(num_queries_mu_); + num_queries_++; +} + +long QpsGauge::Get() { + std::lock_guard lock(num_queries_mu_); + gpr_timespec time_diff = + gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start_time_); + long duration_secs = time_diff.tv_sec > 0 ? time_diff.tv_sec : 1; + return num_queries_ / duration_secs; } grpc::Status MetricsServiceImpl::GetAllGauges( @@ -60,7 +70,7 @@ grpc::Status MetricsServiceImpl::GetAllGauges( gpr_log(GPR_DEBUG, "GetAllGauges called"); std::lock_guard lock(mu_); - for (auto it = gauges_.begin(); it != gauges_.end(); it++) { + for (auto it = qps_gauges_.begin(); it != qps_gauges_.end(); it++) { GaugeResponse resp; resp.set_name(it->first); // Gauge name resp.set_long_value(it->second->Get()); // Gauge value @@ -75,8 +85,8 @@ grpc::Status MetricsServiceImpl::GetGauge(ServerContext* context, GaugeResponse* response) { std::lock_guard lock(mu_); - const auto it = gauges_.find(request->name()); - if (it != gauges_.end()) { + const auto it = qps_gauges_.find(request->name()); + if (it != qps_gauges_.end()) { response->set_name(it->first); response->set_long_value(it->second->Get()); } @@ -84,16 +94,17 @@ grpc::Status MetricsServiceImpl::GetGauge(ServerContext* context, return Status::OK; } -std::shared_ptr MetricsServiceImpl::CreateGauge(const grpc::string& name, - bool* already_present) { +std::shared_ptr MetricsServiceImpl::CreateQpsGauge( + const grpc::string& name, bool* already_present) { std::lock_guard lock(mu_); - std::shared_ptr gauge(new Gauge(0)); - const auto p = gauges_.emplace(name, gauge); + std::shared_ptr qps_gauge(new QpsGauge()); + const auto p = qps_gauges_.emplace(name, qps_gauge); - // p.first is an iterator pointing to > pair. p.second - // is a boolean which is set to 'true' if the Gauge is inserted in the guages_ - // map and 'false' if it is already present in the map + // p.first is an iterator pointing to > pair. + // p.second is a boolean which is set to 'true' if the QpsGauge is + // successfully inserted in the guages_ map and 'false' if it is already + // present in the map *already_present = !p.second; return p.first->second; } diff --git a/test/cpp/util/metrics_server.h b/test/cpp/util/metrics_server.h index ce05e0be64c..b04879c5e69 100644 --- a/test/cpp/util/metrics_server.h +++ b/test/cpp/util/metrics_server.h @@ -36,6 +36,7 @@ #include #include +#include "grpc/support/time.h" #include "src/proto/grpc/testing/metrics.grpc.pb.h" #include "src/proto/grpc/testing/metrics.pb.h" @@ -48,10 +49,13 @@ * Example: * MetricsServiceImpl metricsImpl; * .. - * // Create Gauge(s). Note: Gauges can be created even after calling + * // Create QpsGauge(s). Note: QpsGauges can be created even after calling * // 'StartServer'. - * Gauge gauge1 = metricsImpl.CreateGauge("foo",is_present); - * // gauge1 can now be used anywhere in the program to set values. + * QpsGauge qps_gauge1 = metricsImpl.CreateQpsGauge("foo", is_present); + * // qps_gauge1 can now be used anywhere in the program by first making a + * // one-time call qps_gauge1.Reset() and then calling qps_gauge1.Incr() + * // every time to increment a query counter + * * ... * // Create the metrics server * std::unique_ptr server = metricsImpl.StartServer(port); @@ -60,17 +64,24 @@ namespace grpc { namespace testing { -// TODO(sreek): Add support for other types of Gauges like Double, String in -// future -class Gauge { +class QpsGauge { public: - Gauge(long initial_val); - void Set(long new_val); + QpsGauge(); + + // Initialize the internal timer and reset the query count to 0 + void Reset(); + + // Increment the query count by 1 + void Incr(); + + // Return the current qps (i.e query count divided by the time since this + // QpsGauge object created (or Reset() was called)) long Get(); private: - long val_; - std::mutex val_mu_; + gpr_timespec start_time_; + long num_queries_; + std::mutex num_queries_mu_; }; class MetricsServiceImpl GRPC_FINAL : public MetricsService::Service { @@ -81,17 +92,17 @@ class MetricsServiceImpl GRPC_FINAL : public MetricsService::Service { grpc::Status GetGauge(ServerContext* context, const GaugeRequest* request, GaugeResponse* response) GRPC_OVERRIDE; - // Create a Gauge with name 'name'. is_present is set to true if the Gauge + // Create a QpsGauge with name 'name'. is_present is set to true if the Gauge // is already present in the map. - // NOTE: CreateGauge can be called anytime (i.e before or after calling + // NOTE: CreateQpsGauge can be called anytime (i.e before or after calling // StartServer). - std::shared_ptr CreateGauge(const grpc::string& name, + std::shared_ptr CreateQpsGauge(const grpc::string& name, bool* already_present); std::unique_ptr StartServer(int port); private: - std::map> gauges_; + std::map> qps_gauges_; std::mutex mu_; }; diff --git a/tools/run_tests/stress_test/STRESS_CLIENT_SPEC.md b/tools/run_tests/stress_test/STRESS_CLIENT_SPEC.md index 62ca8aff2cd..9f079beebc0 100644 --- a/tools/run_tests/stress_test/STRESS_CLIENT_SPEC.md +++ b/tools/run_tests/stress_test/STRESS_CLIENT_SPEC.md @@ -6,8 +6,8 @@ This document specifies the features a stress test client should implement in or -------------- **1.** A stress test client should be able to repeatedly execute one or more of the existing 'interop test cases'. It may just be a wrapper around the existing interop test client. The exact command line arguments the client should support are listed in _Table 1_ below. -**2.** The stress test client must implement a metrics server defined by _[metrics.proto](https://github.com/grpc/grpc/blob/master/src/proto/grpc/testing/metrics.proto)_ and must expose _qps_ as a long-valued Gauge. The client can track the overall _qps_ in one Gauge or in multiple Gauges (for example: One per Channel or Stub). - The framework periodically queries the _qps_ by calling the `GetAllGauges()` method (the framework assumes that all the returned Gauges are _qps_ Gauges) and uses this to determine if the stress test client is running or crashed or stalled. +**2.** The stress test client must implement a metrics server defined by _[metrics.proto](https://github.com/grpc/grpc/blob/master/src/proto/grpc/testing/metrics.proto)_ and must expose _qps_ as a `Long`-valued Gauge. The client can track the overall _qps_ in one Gauge or in multiple Gauges (for example: One per Channel or Stub). + The framework periodically queries the _qps_ by calling the `GetAllGauges()` method (the framework assumes that all the returned Gauges are _qps_ Gauges and adds them up to determine the final qps) and uses this to determine if the stress test client is running or crashed or stalled. > *Note:* In this context, the term _**qps**_ means _interop test cases per second_ (not _messages per second_ or _rpc calls per second_) diff --git a/tools/run_tests/stress_test/configs/asan.json b/tools/run_tests/stress_test/configs/asan.json index c5588553147..cb9f55763b1 100644 --- a/tools/run_tests/stress_test/configs/asan.json +++ b/tools/run_tests/stress_test/configs/asan.json @@ -16,8 +16,7 @@ "num_channels_per_server":5, "num_stubs_per_channel":10, "test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1", - "metrics_port": 8081, - "metrics_collection_interval_secs":120 + "metrics_port": 8081 }, "metricsPort": 8081, "metricsArgs": { diff --git a/tools/run_tests/stress_test/configs/opt-tsan-asan.json b/tools/run_tests/stress_test/configs/opt-tsan-asan.json index 4f172ef30bc..936d15169e1 100644 --- a/tools/run_tests/stress_test/configs/opt-tsan-asan.json +++ b/tools/run_tests/stress_test/configs/opt-tsan-asan.json @@ -26,8 +26,7 @@ "num_channels_per_server":5, "num_stubs_per_channel":10, "test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1", - "metrics_port": 8081, - "metrics_collection_interval_secs": 60 + "metrics_port": 8081 }, "metricsPort": 8081, "metricsArgs": { diff --git a/tools/run_tests/stress_test/configs/opt.json b/tools/run_tests/stress_test/configs/opt.json index 75505186f20..f45b8240487 100644 --- a/tools/run_tests/stress_test/configs/opt.json +++ b/tools/run_tests/stress_test/configs/opt.json @@ -16,8 +16,7 @@ "num_channels_per_server":5, "num_stubs_per_channel":10, "test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1", - "metrics_port": 8081, - "metrics_collection_interval_secs": 60 + "metrics_port": 8081 }, "metricsPort": 8081, "metricsArgs": { diff --git a/tools/run_tests/stress_test/configs/tsan.json b/tools/run_tests/stress_test/configs/tsan.json index a7ec08313d6..6ef3bdf7ea7 100644 --- a/tools/run_tests/stress_test/configs/tsan.json +++ b/tools/run_tests/stress_test/configs/tsan.json @@ -16,8 +16,7 @@ "num_channels_per_server":5, "num_stubs_per_channel":10, "test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1", - "metrics_port": 8081, - "metrics_collection_interval_secs":120 + "metrics_port": 8081 }, "metricsPort": 8081, "metricsArgs": {