Simplify QPS Metrics collection

pull/6260/head
Sree Kuchibhotla 9 years ago
parent 241dea56c8
commit 3714e302c0
  1. 40
      test/cpp/interop/stress_interop_client.cc
  2. 8
      test/cpp/interop/stress_interop_client.h
  3. 13
      test/cpp/interop/stress_test.cc
  4. 45
      test/cpp/util/metrics_server.cc
  5. 39
      test/cpp/util/metrics_server.h
  6. 4
      tools/run_tests/stress_test/STRESS_CLIENT_SPEC.md
  7. 3
      tools/run_tests/stress_test/configs/asan.json
  8. 3
      tools/run_tests/stress_test/configs/opt-tsan-asan.json
  9. 3
      tools/run_tests/stress_test/configs/opt.json
  10. 3
      tools/run_tests/stress_test/configs/tsan.json

@ -84,49 +84,37 @@ StressTestInteropClient::StressTestInteropClient(
int test_id, const grpc::string& server_address,
std::shared_ptr<Channel> channel,
const WeightedRandomTestSelector& test_selector, long test_duration_secs,
long sleep_duration_ms, long metrics_collection_interval_secs)
long sleep_duration_ms)
: test_id_(test_id),
server_address_(server_address),
channel_(channel),
interop_client_(new InteropClient(channel, false)),
test_selector_(test_selector),
test_duration_secs_(test_duration_secs),
sleep_duration_ms_(sleep_duration_ms),
metrics_collection_interval_secs_(metrics_collection_interval_secs) {}
sleep_duration_ms_(sleep_duration_ms) {}
void StressTestInteropClient::MainLoop(std::shared_ptr<Gauge> qps_gauge) {
void StressTestInteropClient::MainLoop(std::shared_ptr<QpsGauge> qps_gauge) {
gpr_log(GPR_INFO, "Running test %d. ServerAddr: %s", test_id_,
server_address_.c_str());
gpr_timespec test_end_time =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(test_duration_secs_, GPR_TIMESPAN));
gpr_timespec test_end_time;
if (test_duration_secs_ < 0) {
test_end_time = gpr_inf_future(GPR_CLOCK_REALTIME);
} else {
test_end_time =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(test_duration_secs_, GPR_TIMESPAN));
}
gpr_timespec current_time = gpr_now(GPR_CLOCK_REALTIME);
gpr_timespec next_stat_collection_time = current_time;
gpr_timespec collection_interval =
gpr_time_from_seconds(metrics_collection_interval_secs_, GPR_TIMESPAN);
long num_calls_per_interval = 0;
qps_gauge->Reset();
while (test_duration_secs_ < 0 ||
gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), test_end_time) < 0) {
while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), test_end_time) < 0) {
// Select the test case to execute based on the weights and execute it
TestCaseType test_case = test_selector_.GetNextTest();
gpr_log(GPR_DEBUG, "%d - Executing the test case %d", test_id_, test_case);
RunTest(test_case);
num_calls_per_interval++;
// See if its time to collect stats yet
current_time = gpr_now(GPR_CLOCK_REALTIME);
if (gpr_time_cmp(next_stat_collection_time, current_time) < 0) {
qps_gauge->Set(num_calls_per_interval /
metrics_collection_interval_secs_);
num_calls_per_interval = 0;
next_stat_collection_time =
gpr_time_add(current_time, collection_interval);
}
qps_gauge->Incr();
// Sleep between successive calls if needed
if (sleep_duration_ms_ > 0) {

@ -87,12 +87,11 @@ class StressTestInteropClient {
StressTestInteropClient(int test_id, const grpc::string& server_address,
std::shared_ptr<Channel> channel,
const WeightedRandomTestSelector& test_selector,
long test_duration_secs, long sleep_duration_ms,
long metrics_collection_interval_secs);
long test_duration_secs, long sleep_duration_ms);
// The main function. Use this as the thread entry point.
// qps_gauge is the Gauge to record the requests per second metric
void MainLoop(std::shared_ptr<Gauge> qps_gauge);
// qps_gauge is the QpsGauge to record the requests per second metric
void MainLoop(std::shared_ptr<QpsGauge> qps_gauge);
private:
void RunTest(TestCaseType test_case);
@ -104,7 +103,6 @@ class StressTestInteropClient {
const WeightedRandomTestSelector& test_selector_;
long test_duration_secs_;
long sleep_duration_ms_;
long metrics_collection_interval_secs_;
};
} // namespace testing

@ -56,9 +56,6 @@ extern void gpr_default_log(gpr_log_func_args* args);
DEFINE_int32(metrics_port, 8081, "The metrics server port.");
DEFINE_int32(metrics_collection_interval_secs, 5,
"How often (in seconds) should metrics be recorded.");
DEFINE_int32(sleep_duration_ms, 0,
"The duration (in millisec) between two"
" consecutive test calls (per server) issued by the server.");
@ -275,19 +272,19 @@ int main(int argc, char** argv) {
stub_idx++) {
StressTestInteropClient* client = new StressTestInteropClient(
++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
FLAGS_sleep_duration_ms, FLAGS_metrics_collection_interval_secs);
FLAGS_sleep_duration_ms);
bool is_already_created;
// Gauge name
bool is_already_created = false;
// QpsGauge name
std::snprintf(buffer, sizeof(buffer),
"/stress_test/server_%d/channel_%d/stub_%d/qps",
server_idx, channel_idx, stub_idx);
test_threads.emplace_back(grpc::thread(
&StressTestInteropClient::MainLoop, client,
metrics_service.CreateGauge(buffer, &is_already_created)));
metrics_service.CreateQpsGauge(buffer, &is_already_created)));
// The Gauge should not have been already created
// The QpsGauge should not have been already created
GPR_ASSERT(!is_already_created);
}
}

@ -42,16 +42,26 @@
namespace grpc {
namespace testing {
Gauge::Gauge(long initial_val) : val_(initial_val) {}
QpsGauge::QpsGauge()
: start_time_(gpr_now(GPR_CLOCK_REALTIME)), num_queries_(0) {}
void Gauge::Set(long new_val) {
std::lock_guard<std::mutex> lock(val_mu_);
val_ = new_val;
void QpsGauge::Reset() {
std::lock_guard<std::mutex> lock(num_queries_mu_);
num_queries_ = 0;
start_time_ = gpr_now(GPR_CLOCK_REALTIME);
}
long Gauge::Get() {
std::lock_guard<std::mutex> lock(val_mu_);
return val_;
void QpsGauge::Incr() {
std::lock_guard<std::mutex> lock(num_queries_mu_);
num_queries_++;
}
long QpsGauge::Get() {
std::lock_guard<std::mutex> lock(num_queries_mu_);
gpr_timespec time_diff =
gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start_time_);
long duration_secs = time_diff.tv_sec > 0 ? time_diff.tv_sec : 1;
return num_queries_ / duration_secs;
}
grpc::Status MetricsServiceImpl::GetAllGauges(
@ -60,7 +70,7 @@ grpc::Status MetricsServiceImpl::GetAllGauges(
gpr_log(GPR_DEBUG, "GetAllGauges called");
std::lock_guard<std::mutex> lock(mu_);
for (auto it = gauges_.begin(); it != gauges_.end(); it++) {
for (auto it = qps_gauges_.begin(); it != qps_gauges_.end(); it++) {
GaugeResponse resp;
resp.set_name(it->first); // Gauge name
resp.set_long_value(it->second->Get()); // Gauge value
@ -75,8 +85,8 @@ grpc::Status MetricsServiceImpl::GetGauge(ServerContext* context,
GaugeResponse* response) {
std::lock_guard<std::mutex> lock(mu_);
const auto it = gauges_.find(request->name());
if (it != gauges_.end()) {
const auto it = qps_gauges_.find(request->name());
if (it != qps_gauges_.end()) {
response->set_name(it->first);
response->set_long_value(it->second->Get());
}
@ -84,16 +94,17 @@ grpc::Status MetricsServiceImpl::GetGauge(ServerContext* context,
return Status::OK;
}
std::shared_ptr<Gauge> MetricsServiceImpl::CreateGauge(const grpc::string& name,
bool* already_present) {
std::shared_ptr<QpsGauge> MetricsServiceImpl::CreateQpsGauge(
const grpc::string& name, bool* already_present) {
std::lock_guard<std::mutex> lock(mu_);
std::shared_ptr<Gauge> gauge(new Gauge(0));
const auto p = gauges_.emplace(name, gauge);
std::shared_ptr<QpsGauge> qps_gauge(new QpsGauge());
const auto p = qps_gauges_.emplace(name, qps_gauge);
// p.first is an iterator pointing to <name, shared_ptr<Gauge>> pair. p.second
// is a boolean which is set to 'true' if the Gauge is inserted in the guages_
// map and 'false' if it is already present in the map
// p.first is an iterator pointing to <name, shared_ptr<QpsGauge>> pair.
// p.second is a boolean which is set to 'true' if the QpsGauge is
// successfully inserted in the guages_ map and 'false' if it is already
// present in the map
*already_present = !p.second;
return p.first->second;
}

@ -36,6 +36,7 @@
#include <map>
#include <mutex>
#include "grpc/support/time.h"
#include "src/proto/grpc/testing/metrics.grpc.pb.h"
#include "src/proto/grpc/testing/metrics.pb.h"
@ -48,10 +49,13 @@
* Example:
* MetricsServiceImpl metricsImpl;
* ..
* // Create Gauge(s). Note: Gauges can be created even after calling
* // Create QpsGauge(s). Note: QpsGauges can be created even after calling
* // 'StartServer'.
* Gauge gauge1 = metricsImpl.CreateGauge("foo",is_present);
* // gauge1 can now be used anywhere in the program to set values.
* QpsGauge qps_gauge1 = metricsImpl.CreateQpsGauge("foo", is_present);
* // qps_gauge1 can now be used anywhere in the program by first making a
* // one-time call qps_gauge1.Reset() and then calling qps_gauge1.Incr()
* // every time to increment a query counter
*
* ...
* // Create the metrics server
* std::unique_ptr<grpc::Server> server = metricsImpl.StartServer(port);
@ -60,17 +64,24 @@
namespace grpc {
namespace testing {
// TODO(sreek): Add support for other types of Gauges like Double, String in
// future
class Gauge {
class QpsGauge {
public:
Gauge(long initial_val);
void Set(long new_val);
QpsGauge();
// Initialize the internal timer and reset the query count to 0
void Reset();
// Increment the query count by 1
void Incr();
// Return the current qps (i.e query count divided by the time since this
// QpsGauge object created (or Reset() was called))
long Get();
private:
long val_;
std::mutex val_mu_;
gpr_timespec start_time_;
long num_queries_;
std::mutex num_queries_mu_;
};
class MetricsServiceImpl GRPC_FINAL : public MetricsService::Service {
@ -81,17 +92,17 @@ class MetricsServiceImpl GRPC_FINAL : public MetricsService::Service {
grpc::Status GetGauge(ServerContext* context, const GaugeRequest* request,
GaugeResponse* response) GRPC_OVERRIDE;
// Create a Gauge with name 'name'. is_present is set to true if the Gauge
// Create a QpsGauge with name 'name'. is_present is set to true if the Gauge
// is already present in the map.
// NOTE: CreateGauge can be called anytime (i.e before or after calling
// NOTE: CreateQpsGauge can be called anytime (i.e before or after calling
// StartServer).
std::shared_ptr<Gauge> CreateGauge(const grpc::string& name,
std::shared_ptr<QpsGauge> CreateQpsGauge(const grpc::string& name,
bool* already_present);
std::unique_ptr<grpc::Server> StartServer(int port);
private:
std::map<string, std::shared_ptr<Gauge>> gauges_;
std::map<string, std::shared_ptr<QpsGauge>> qps_gauges_;
std::mutex mu_;
};

@ -6,8 +6,8 @@ This document specifies the features a stress test client should implement in or
--------------
**1.** A stress test client should be able to repeatedly execute one or more of the existing 'interop test cases'. It may just be a wrapper around the existing interop test client. The exact command line arguments the client should support are listed in _Table 1_ below.
**2.** The stress test client must implement a metrics server defined by _[metrics.proto](https://github.com/grpc/grpc/blob/master/src/proto/grpc/testing/metrics.proto)_ and must expose _qps_ as a long-valued Gauge. The client can track the overall _qps_ in one Gauge or in multiple Gauges (for example: One per Channel or Stub).
The framework periodically queries the _qps_ by calling the `GetAllGauges()` method (the framework assumes that all the returned Gauges are _qps_ Gauges) and uses this to determine if the stress test client is running or crashed or stalled.
**2.** The stress test client must implement a metrics server defined by _[metrics.proto](https://github.com/grpc/grpc/blob/master/src/proto/grpc/testing/metrics.proto)_ and must expose _qps_ as a `Long`-valued Gauge. The client can track the overall _qps_ in one Gauge or in multiple Gauges (for example: One per Channel or Stub).
The framework periodically queries the _qps_ by calling the `GetAllGauges()` method (the framework assumes that all the returned Gauges are _qps_ Gauges and adds them up to determine the final qps) and uses this to determine if the stress test client is running or crashed or stalled.
> *Note:* In this context, the term _**qps**_ means _interop test cases per second_ (not _messages per second_ or _rpc calls per second_)

@ -16,8 +16,7 @@
"num_channels_per_server":5,
"num_stubs_per_channel":10,
"test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1",
"metrics_port": 8081,
"metrics_collection_interval_secs":120
"metrics_port": 8081
},
"metricsPort": 8081,
"metricsArgs": {

@ -26,8 +26,7 @@
"num_channels_per_server":5,
"num_stubs_per_channel":10,
"test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1",
"metrics_port": 8081,
"metrics_collection_interval_secs": 60
"metrics_port": 8081
},
"metricsPort": 8081,
"metricsArgs": {

@ -16,8 +16,7 @@
"num_channels_per_server":5,
"num_stubs_per_channel":10,
"test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1",
"metrics_port": 8081,
"metrics_collection_interval_secs": 60
"metrics_port": 8081
},
"metricsPort": 8081,
"metricsArgs": {

@ -16,8 +16,7 @@
"num_channels_per_server":5,
"num_stubs_per_channel":10,
"test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1",
"metrics_port": 8081,
"metrics_collection_interval_secs":120
"metrics_port": 8081
},
"metricsPort": 8081,
"metricsArgs": {

Loading…
Cancel
Save