Merge pull request #11000 from lyuxuan/poll_stat

Get metric "polls/request" for benchmarks
pull/11097/merge
lyuxuan 8 years ago committed by GitHub
commit d0b47db0c0
  1. 12
      src/core/lib/surface/completion_queue.c
  2. 10
      src/core/lib/surface/completion_queue.h
  3. 4
      src/proto/grpc/testing/control.proto
  4. 6
      src/proto/grpc/testing/stats.proto
  5. 14
      test/cpp/microbenchmarks/fullstack_fixtures.h
  6. 15
      test/cpp/qps/client.h
  7. 8
      test/cpp/qps/client_async.cc
  8. 7
      test/cpp/qps/driver.cc
  9. 1
      test/cpp/qps/qps_json_driver.cc
  10. 21
      test/cpp/qps/report.cc
  11. 7
      test/cpp/qps/report.h
  12. 14
      test/cpp/qps/server.h
  13. 8
      test/cpp/qps/server_async.cc

@ -262,6 +262,7 @@ typedef struct cq_data {
int is_server_cq;
int num_pluckers;
int num_polls;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
grpc_closure pollset_shutdown_done;
@ -425,6 +426,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
cqd->shutdown_called = 0;
cqd->is_server_cq = 0;
cqd->num_pluckers = 0;
cqd->num_polls = 0;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
#ifndef NDEBUG
cqd->outstanding_tag_count = 0;
@ -442,6 +444,14 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
return cc->vtable->cq_completion_type;
}
int grpc_get_cq_poll_num(grpc_completion_queue *cc) {
int cur_num_polls;
gpr_mu_lock(cc->data.mu);
cur_num_polls = cc->data.num_polls;
gpr_mu_unlock(cc->data.mu);
return cur_num_polls;
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
@ -830,6 +840,7 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
/* The main polling work happens in grpc_pollset_work */
gpr_mu_lock(cqd->mu);
cqd->num_polls++;
grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
NULL, now, iteration_deadline);
gpr_mu_unlock(cqd->mu);
@ -1015,6 +1026,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
break;
}
cqd->num_polls++;
grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
&worker, now, deadline);
if (err != GRPC_ERROR_NONE) {

@ -49,6 +49,10 @@ extern grpc_tracer_flag grpc_trace_operation_failures;
extern grpc_tracer_flag grpc_trace_pending_tags;
#endif
#ifdef __cplusplus
extern "C" {
#endif
typedef struct grpc_cq_completion {
gpr_mpscq_node node;
@ -103,7 +107,13 @@ bool grpc_cq_can_listen(grpc_completion_queue *cc);
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
int grpc_get_cq_poll_num(grpc_completion_queue *cc);
grpc_completion_queue *grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
#ifdef __cplusplus
}
#endif
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */

@ -244,6 +244,10 @@ message ScenarioResultSummary
// Number of requests that succeeded/failed
double successful_requests_per_second = 13;
double failed_requests_per_second = 14;
// Number of polls called inside completion queue per request
double client_polls_per_request = 15;
double server_polls_per_request = 16;
}
// Results of a single benchmark scenario.

@ -47,6 +47,9 @@ message ServerStats {
// change in idle time of the server (data from proc/stat)
uint64 idle_cpu_time = 5;
// Number of polls called inside completion queue
uint64 cq_poll_count = 6;
}
// Histogram params based on grpc/support/histogram.c
@ -81,4 +84,7 @@ message ClientStats {
// Number of failed requests (one row per status code seen)
repeated RequestResultCount request_results = 5;
// Number of polls called inside completion queue
uint64 cq_poll_count = 6;
}

@ -100,6 +100,12 @@ class FullstackFixture : public BaseFixture {
}
}
void AddToLabel(std::ostream& out, benchmark::State& state) {
BaseFixture::AddToLabel(out, state);
out << " polls/iter:"
<< (double)grpc_get_cq_poll_num(this->cq()->cq()) / state.iterations();
}
ServerCompletionQueue* cq() { return cq_.get(); }
std::shared_ptr<Channel> channel() { return channel_; }
@ -212,6 +218,12 @@ class EndpointPairFixture : public BaseFixture {
}
}
void AddToLabel(std::ostream& out, benchmark::State& state) {
BaseFixture::AddToLabel(out, state);
out << " polls/iter:"
<< (double)grpc_get_cq_poll_num(this->cq()->cq()) / state.iterations();
}
ServerCompletionQueue* cq() { return cq_.get(); }
std::shared_ptr<Channel> channel() { return channel_; }
@ -245,7 +257,7 @@ class InProcessCHTTP2 : public EndpointPairFixture {
void AddToLabel(std::ostream& out, benchmark::State& state) {
EndpointPairFixture::AddToLabel(out, state);
out << " writes/iter:"
<< ((double)stats_.num_writes / (double)state.iterations());
<< (double)stats_.num_writes / (double)state.iterations();
}
private:

@ -46,6 +46,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/surface/completion_queue.h"
#include "src/proto/grpc/testing/payloads.pb.h"
#include "src/proto/grpc/testing/services.grpc.pb.h"
@ -150,7 +151,8 @@ class Client {
Client()
: timer_(new UsageTimer),
interarrival_timer_(),
started_requests_(false) {
started_requests_(false),
last_reset_poll_count_(0) {
gpr_event_init(&start_requests_);
}
virtual ~Client() {}
@ -162,6 +164,8 @@ class Client {
MaybeStartRequests();
int cur_poll_count = GetPollCount();
int poll_count = cur_poll_count - last_reset_poll_count_;
if (reset) {
std::vector<Histogram> to_merge(threads_.size());
std::vector<StatusHistogram> to_merge_status(threads_.size());
@ -176,6 +180,7 @@ class Client {
MergeStatusHistogram(to_merge_status[i], &statuses);
}
timer_result = timer->Mark();
last_reset_poll_count_ = cur_poll_count;
} else {
// merge snapshots of each thread histogram
for (size_t i = 0; i < threads_.size(); i++) {
@ -195,6 +200,7 @@ class Client {
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
stats.set_cq_poll_count(poll_count);
return stats;
}
@ -209,6 +215,11 @@ class Client {
}
}
virtual int GetPollCount() {
// For sync client.
return 0;
}
protected:
bool closed_loop_;
gpr_atm thread_pool_done_;
@ -351,6 +362,8 @@ class Client {
gpr_event start_requests_;
bool started_requests_;
int last_reset_poll_count_;
void MaybeStartRequests() {
if (!started_requests_) {
started_requests_ = true;

@ -205,6 +205,14 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
int GetPollCount() override {
int count = 0;
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
count += grpc_get_cq_poll_num((*cq)->cq());
}
return count;
}
protected:
const int num_async_threads_;

@ -112,6 +112,8 @@ static deque<string> get_workers(const string& env_name) {
static double WallTime(ClientStats s) { return s.time_elapsed(); }
static double SystemTime(ClientStats s) { return s.time_system(); }
static double UserTime(ClientStats s) { return s.time_user(); }
static double CliPollCount(ClientStats s) { return s.cq_poll_count(); }
static double SvrPollCount(ServerStats s) { return s.cq_poll_count(); }
static double ServerWallTime(ServerStats s) { return s.time_elapsed(); }
static double ServerSystemTime(ServerStats s) { return s.time_system(); }
static double ServerUserTime(ServerStats s) { return s.time_user(); }
@ -180,6 +182,11 @@ static void postprocess_scenario_result(ScenarioResult* result) {
result->mutable_summary()->set_failed_requests_per_second(failures /
time_estimate);
}
result->mutable_summary()->set_client_polls_per_request(
sum(result->client_stats(), CliPollCount) / histogram.Count());
result->mutable_summary()->set_server_polls_per_request(
sum(result->server_stats(), SvrPollCount) / histogram.Count());
}
std::unique_ptr<ScenarioResult> RunScenario(

@ -94,6 +94,7 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
GetReporter()->ReportLatency(*result);
GetReporter()->ReportTimes(*result);
GetReporter()->ReportCpuUsage(*result);
GetReporter()->ReportPollCount(*result);
for (int i = 0; *success && i < result->client_success_size(); i++) {
*success = result->client_success(i);

@ -80,6 +80,12 @@ void CompositeReporter::ReportCpuUsage(const ScenarioResult& result) {
}
}
void CompositeReporter::ReportPollCount(const ScenarioResult& result) {
for (size_t i = 0; i < reporters_.size(); ++i) {
reporters_[i]->ReportPollCount(result);
}
}
void GprLogReporter::ReportQPS(const ScenarioResult& result) {
gpr_log(GPR_INFO, "QPS: %.1f", result.summary().qps());
if (result.summary().failed_requests_per_second() > 0) {
@ -121,6 +127,13 @@ void GprLogReporter::ReportCpuUsage(const ScenarioResult& result) {
result.summary().server_cpu_usage());
}
void GprLogReporter::ReportPollCount(const ScenarioResult& result) {
gpr_log(GPR_INFO, "Client Polls per Request: %.2f",
result.summary().client_polls_per_request());
gpr_log(GPR_INFO, "Server Polls per Request: %.2f",
result.summary().server_polls_per_request());
}
void JsonReporter::ReportQPS(const ScenarioResult& result) {
grpc::string json_string =
SerializeJson(result, "type.googleapis.com/grpc.testing.ScenarioResult");
@ -145,6 +158,10 @@ void JsonReporter::ReportCpuUsage(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
void JsonReporter::ReportPollCount(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
void RpcReporter::ReportQPS(const ScenarioResult& result) {
grpc::ClientContext context;
grpc::Status status;
@ -177,5 +194,9 @@ void RpcReporter::ReportCpuUsage(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
void RpcReporter::ReportPollCount(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
} // namespace testing
} // namespace grpc

@ -76,6 +76,9 @@ class Reporter {
/** Reports server cpu usage. */
virtual void ReportCpuUsage(const ScenarioResult& result) = 0;
/** Reports client and server poll usage inside completion queue. */
virtual void ReportPollCount(const ScenarioResult& result) = 0;
private:
const string name_;
};
@ -93,6 +96,7 @@ class CompositeReporter : public Reporter {
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
private:
std::vector<std::unique_ptr<Reporter> > reporters_;
@ -109,6 +113,7 @@ class GprLogReporter : public Reporter {
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
};
/** Dumps the report to a JSON file. */
@ -123,6 +128,7 @@ class JsonReporter : public Reporter {
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
const string report_file_;
};
@ -138,6 +144,7 @@ class RpcReporter : public Reporter {
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
std::unique_ptr<ReportQpsScenarioService::Stub> stub_;
};

@ -38,6 +38,7 @@
#include <grpc/support/cpu.h>
#include <vector>
#include "src/core/lib/surface/completion_queue.h"
#include "src/proto/grpc/testing/control.pb.h"
#include "src/proto/grpc/testing/messages.pb.h"
#include "test/core/end2end/data/ssl_test_data.h"
@ -49,7 +50,8 @@ namespace testing {
class Server {
public:
explicit Server(const ServerConfig& config) : timer_(new UsageTimer) {
explicit Server(const ServerConfig& config)
: timer_(new UsageTimer), last_reset_poll_count_(0) {
cores_ = gpr_cpu_num_cores();
if (config.port()) {
port_ = config.port();
@ -62,10 +64,13 @@ class Server {
ServerStats Mark(bool reset) {
UsageTimer::Result timer_result;
int cur_poll_count = GetPollCount();
int poll_count = cur_poll_count - last_reset_poll_count_;
if (reset) {
std::unique_ptr<UsageTimer> timer(new UsageTimer);
timer.swap(timer_);
timer_result = timer->Mark();
last_reset_poll_count_ = cur_poll_count;
} else {
timer_result = timer_->Mark();
}
@ -76,6 +81,7 @@ class Server {
stats.set_time_user(timer_result.user);
stats.set_total_cpu_time(timer_result.total_cpu_time);
stats.set_idle_cpu_time(timer_result.idle_cpu_time);
stats.set_cq_poll_count(poll_count);
return stats;
}
@ -106,10 +112,16 @@ class Server {
}
}
virtual int GetPollCount() {
// For sync server.
return 0;
}
private:
int port_;
int cores_;
std::unique_ptr<UsageTimer> timer_;
int last_reset_poll_count_;
};
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config);

@ -186,6 +186,14 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
shutdown_thread.join();
}
int GetPollCount() override {
int count = 0;
for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
count += grpc_get_cq_poll_num((*cq)->cq());
}
return count;
}
private:
void ShutdownThreadFunc() {
// TODO (vpai): Remove this deadline and allow Shutdown to finish properly

Loading…
Cancel
Save