initial implementation.

pull/11000/head
Yuxuan Li 8 years ago
parent 12056f1a0c
commit 999ac157e6
  1. 8
      src/core/lib/surface/completion_queue.c
  2. 2
      src/core/lib/surface/completion_queue.h
  3. 2
      src/proto/grpc/testing/control.proto
  4. 2
      src/proto/grpc/testing/stats.proto
  5. 22
      test/cpp/microbenchmarks/fullstack_fixtures.h
  6. 4
      test/cpp/microbenchmarks/helpers.cc
  7. 2
      test/cpp/microbenchmarks/helpers.h
  8. 15
      test/cpp/qps/client.h
  9. 15
      test/cpp/qps/client_async.cc
  10. 3
      test/cpp/qps/driver.cc
  11. 1
      test/cpp/qps/qps_json_driver.cc
  12. 19
      test/cpp/qps/report.cc
  13. 7
      test/cpp/qps/report.h

@ -227,6 +227,7 @@ struct grpc_completion_queue {
/* TODO: sreek - This will no longer be needed. Use polling_type set */
int is_non_listening_server_cq;
int num_pluckers;
gpr_atm num_poll;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
grpc_closure pollset_shutdown_done;
@ -292,6 +293,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
cc->is_server_cq = 0;
cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0;
gpr_atm_no_barrier_store(&cc->num_poll, 0);
gpr_atm_no_barrier_store(&cc->things_queued_ever, 0);
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
@ -308,6 +310,10 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
return cc->completion_type;
}
gpr_atm grpc_get_cq_poll_num(grpc_completion_queue *cc) {
return gpr_atm_no_barrier_load(&cc->num_poll);
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
@ -592,6 +598,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_mu_lock(cc->mu);
continue;
} else {
cc->num_poll++;
grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
NULL, now, iteration_deadline);
if (err != GRPC_ERROR_NONE) {
@ -784,6 +791,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(cc->mu);
} else {
gpr_atm_no_barrier_fetch_add(&cc->num_poll, 1);
grpc_error *err = cc->poller_vtable->work(
&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline);
if (err != GRPC_ERROR_NONE) {

@ -100,6 +100,8 @@ bool grpc_cq_can_listen(grpc_completion_queue *cc);
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
gpr_atm grpc_get_cq_poll_num(grpc_completion_queue *cc);
grpc_completion_queue *grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);

@ -241,6 +241,8 @@ message ScenarioResultSummary
// Number of requests that succeeded/failed
double successful_requests_per_second = 13;
double failed_requests_per_second = 14;
double client_polls_per_request = 15;
}
// Results of a single benchmark scenario.

@ -81,4 +81,6 @@ message ClientStats {
// Number of failed requests (one row per status code seen)
repeated RequestResultCount request_results = 5;
uint64 cq_poll_count = 6;
}

@ -100,6 +100,17 @@ class FullstackFixture : public BaseFixture {
}
}
void Finish(benchmark::State &state) {
std::ostringstream out;
AddToLabel(out, state);
AppendToLabel(out, "polls/iter", (double)grpc_get_cq_poll_num(this->cq()->cq())/state.iterations());
auto label = out.str();
if (label.length() && label[0] == ' ') {
label = label.substr(1);
}
state.SetLabel(label);
}
ServerCompletionQueue* cq() { return cq_.get(); }
std::shared_ptr<Channel> channel() { return channel_; }
@ -212,6 +223,17 @@ class EndpointPairFixture : public BaseFixture {
}
}
void Finish(benchmark::State &state) {
std::ostringstream out;
AddToLabel(out, state);
AppendToLabel(out, "polls/iter", (double)grpc_get_cq_poll_num(this->cq()->cq())/state.iterations());
auto label = out.str();
if (label.length() && label[0] == ' ') {
label = label.substr(1);
}
state.SetLabel(label);
}
ServerCompletionQueue* cq() { return cq_.get(); }
std::shared_ptr<Channel> channel() { return channel_; }

@ -67,3 +67,7 @@ void TrackCounters::AddToLabel(std::ostream &out, benchmark::State &state) {
(double)state.iterations());
#endif
}
void TrackCounters::AppendToLabel(std::ostream& out, std::string metric, double value) {
out << " " << key << ":" << value;
}

@ -35,6 +35,7 @@
#define TEST_CPP_MICROBENCHMARKS_COUNTERS_H
#include <sstream>
#include <string>
extern "C" {
#include <grpc/support/port_platform.h>
@ -79,6 +80,7 @@ class TrackCounters {
public:
virtual void Finish(benchmark::State& state);
virtual void AddToLabel(std::ostream& out, benchmark::State& state);
virtual void AppendToLabel(std::ostream& out, std::string metric, double value);
private:
#ifdef GPR_LOW_LEVEL_COUNTERS

@ -150,7 +150,8 @@ class Client {
Client()
: timer_(new UsageTimer),
interarrival_timer_(),
started_requests_(false) {
started_requests_(false),
last_reset_poll_count_(0) {
gpr_event_init(&start_requests_);
}
virtual ~Client() {}
@ -162,6 +163,7 @@ class Client {
MaybeStartRequests();
int last_reset_poll_count_to_use = last_reset_poll_count_;
if (reset) {
std::vector<Histogram> to_merge(threads_.size());
std::vector<StatusHistogram> to_merge_status(threads_.size());
@ -176,6 +178,7 @@ class Client {
MergeStatusHistogram(to_merge_status[i], &statuses);
}
timer_result = timer->Mark();
last_reset_poll_count_ = GetPollCount();
} else {
// merge snapshots of each thread histogram
for (size_t i = 0; i < threads_.size(); i++) {
@ -195,6 +198,9 @@ class Client {
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
gpr_log(GPR_INFO, "*****poll count : %d %d %d", GetPollCount(), last_reset_poll_count_, last_reset_poll_count_to_use);
stats.set_cq_poll_count(GetPollCount() - last_reset_poll_count_to_use);
return stats;
}
@ -209,6 +215,11 @@ class Client {
}
}
virtual int GetPollCount() {
// For sync client.
return 0;
}
protected:
bool closed_loop_;
gpr_atm thread_pool_done_;
@ -351,6 +362,8 @@ class Client {
gpr_event start_requests_;
bool started_requests_;
int last_reset_poll_count_;
void MaybeStartRequests() {
if (!started_requests_) {
started_requests_ = true;

@ -54,6 +54,10 @@
#include "test/cpp/qps/usage_timer.h"
#include "test/cpp/util/create_test_channel.h"
extern "C" {
#include "src/core/lib/surface/completion_queue.h"
}
namespace grpc {
namespace testing {
@ -205,6 +209,17 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
int GetPollCount() {
int count = 0;
int i = 0;
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
int k = (int)grpc_get_cq_poll_num((*cq)->cq());
gpr_log(GPR_INFO, "%d: per cq poll:%d", i++, k);
count += k;
}
return count;
}
protected:
const int num_async_threads_;

@ -112,6 +112,7 @@ static deque<string> get_workers(const string& env_name) {
static double WallTime(ClientStats s) { return s.time_elapsed(); }
static double SystemTime(ClientStats s) { return s.time_system(); }
static double UserTime(ClientStats s) { return s.time_user(); }
static double PollCount(ClientStats s) { return s.cq_poll_count(); }
static double ServerWallTime(ServerStats s) { return s.time_elapsed(); }
static double ServerSystemTime(ServerStats s) { return s.time_system(); }
static double ServerUserTime(ServerStats s) { return s.time_user(); }
@ -180,6 +181,8 @@ static void postprocess_scenario_result(ScenarioResult* result) {
result->mutable_summary()->set_failed_requests_per_second(failures /
time_estimate);
}
gpr_log(GPR_INFO, "poll count : %f", sum(result->client_stats(), PollCount));
result->mutable_summary()->set_client_polls_per_request(sum(result->client_stats(), PollCount)/histogram.Count());
}
std::unique_ptr<ScenarioResult> RunScenario(

@ -94,6 +94,7 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
GetReporter()->ReportLatency(*result);
GetReporter()->ReportTimes(*result);
GetReporter()->ReportCpuUsage(*result);
GetReporter()->ReportPollCount(*result);
for (int i = 0; *success && i < result->client_success_size(); i++) {
*success = result->client_success(i);

@ -80,6 +80,12 @@ void CompositeReporter::ReportCpuUsage(const ScenarioResult& result) {
}
}
void CompositeReporter::ReportPollCount(const ScenarioResult& result) {
for (size_t i = 0; i < reporters_.size(); ++i) {
reporters_[i]->ReportPollCount(result);
}
}
void GprLogReporter::ReportQPS(const ScenarioResult& result) {
gpr_log(GPR_INFO, "QPS: %.1f", result.summary().qps());
if (result.summary().failed_requests_per_second() > 0) {
@ -121,6 +127,11 @@ void GprLogReporter::ReportCpuUsage(const ScenarioResult& result) {
result.summary().server_cpu_usage());
}
void GprLogReporter::ReportPollCount(const ScenarioResult& result) {
gpr_log(GPR_INFO, "Client Polls per Request: %.2f%%",
result.summary().client_polls_per_request());
}
void JsonReporter::ReportQPS(const ScenarioResult& result) {
grpc::string json_string =
SerializeJson(result, "type.googleapis.com/grpc.testing.ScenarioResult");
@ -145,6 +156,10 @@ void JsonReporter::ReportCpuUsage(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
void JsonReporter::ReportPollCount(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
void RpcReporter::ReportQPS(const ScenarioResult& result) {
grpc::ClientContext context;
grpc::Status status;
@ -177,5 +192,9 @@ void RpcReporter::ReportCpuUsage(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
void RpcReporter::ReportPollCount(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
} // namespace testing
} // namespace grpc

@ -76,6 +76,9 @@ class Reporter {
/** Reports server cpu usage. */
virtual void ReportCpuUsage(const ScenarioResult& result) = 0;
/** Reports server cpu usage. */
virtual void ReportPollCount(const ScenarioResult& result) = 0;
private:
const string name_;
};
@ -93,6 +96,7 @@ class CompositeReporter : public Reporter {
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
private:
std::vector<std::unique_ptr<Reporter> > reporters_;
@ -109,6 +113,7 @@ class GprLogReporter : public Reporter {
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
};
/** Dumps the report to a JSON file. */
@ -123,6 +128,7 @@ class JsonReporter : public Reporter {
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
const string report_file_;
};
@ -138,6 +144,7 @@ class RpcReporter : public Reporter {
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
std::unique_ptr<ReportQpsScenarioService::Stub> stub_;
};

Loading…
Cancel
Save