pull/2151/head
Siddharth Rakesh 10 years ago
commit d5a83fdce8
  1. 6
      include/grpc/grpc.h
  2. 44
      src/core/transport/chttp2_transport.c
  3. 2
      test/core/network_benchmarks/low_level_ping_pong.c
  4. 2
      test/cpp/qps/qps_driver.cc
  5. 2
      test/cpp/qps/qps_test.cc
  6. 10
      test/cpp/qps/report.cc
  7. 9
      test/cpp/qps/report.h
  8. 3
      test/cpp/qps/server_async.cc

@ -361,7 +361,8 @@ grpc_completion_queue *grpc_completion_queue_create(void);
/** Blocks until an event is available, the completion queue is being shut down, /** Blocks until an event is available, the completion queue is being shut down,
or deadline is reached. or deadline is reached.
Returns NULL on timeout, otherwise the event that occurred. Returns a grpc_event with type GRPC_QUEUE_TIMEOUT on timeout,
otherwise a grpc_event describing the event that occurred.
Callers must not call grpc_completion_queue_next and Callers must not call grpc_completion_queue_next and
grpc_completion_queue_pluck simultaneously on the same completion queue. */ grpc_completion_queue_pluck simultaneously on the same completion queue. */
@ -371,7 +372,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
/** Blocks until an event with tag 'tag' is available, the completion queue is /** Blocks until an event with tag 'tag' is available, the completion queue is
being shutdown or deadline is reached. being shutdown or deadline is reached.
Returns NULL on timeout, or a pointer to the event that occurred. Returns a grpc_event with type GRPC_QUEUE_TIMEOUT on timeout,
otherwise a grpc_event describing the event that occurred.
Callers must not call grpc_completion_queue_next and Callers must not call grpc_completion_queue_next and
grpc_completion_queue_pluck simultaneously on the same completion queue. */ grpc_completion_queue_pluck simultaneously on the same completion queue. */

@ -230,7 +230,10 @@ struct transport {
/* basic state management - what are we doing at the moment? */ /* basic state management - what are we doing at the moment? */
gpr_uint8 reading; gpr_uint8 reading;
gpr_uint8 writing; gpr_uint8 writing;
gpr_uint8 calling_back; /** are we calling back (via cb) with a channel-level event */
gpr_uint8 calling_back_channel;
/** are we calling back any grpc_transport_op completion events */
gpr_uint8 calling_back_ops;
gpr_uint8 destroying; gpr_uint8 destroying;
gpr_uint8 closed; gpr_uint8 closed;
error_state error_state; error_state error_state;
@ -357,7 +360,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value); gpr_uint32 value);
static int prepare_callbacks(transport *t); static int prepare_callbacks(transport *t);
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb); static void run_callbacks(transport *t);
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb); static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
static int prepare_write(transport *t); static int prepare_write(transport *t);
@ -565,7 +568,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
} }
gpr_mu_lock(&t->mu); gpr_mu_lock(&t->mu);
t->calling_back = 1; t->calling_back_channel = 1;
ref_transport(t); /* matches unref at end of this function */ ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu); gpr_mu_unlock(&t->mu);
@ -574,7 +577,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
lock(t); lock(t);
t->cb = sr.callbacks; t->cb = sr.callbacks;
t->cb_user_data = sr.user_data; t->cb_user_data = sr.user_data;
t->calling_back = 0; t->calling_back_channel = 0;
if (t->destroying) gpr_cv_signal(&t->cv); if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t); unlock(t);
@ -595,7 +598,7 @@ static void destroy_transport(grpc_transport *gt) {
We need to be not writing as cancellation finalization may produce some We need to be not writing as cancellation finalization may produce some
callbacks that NEED to be made to close out some streams when t->writing callbacks that NEED to be made to close out some streams when t->writing
becomes 0. */ becomes 0. */
while (t->calling_back || t->writing) { while (t->calling_back_channel || t->writing) {
gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
} }
drop_connection(t); drop_connection(t);
@ -830,12 +833,15 @@ static void unlock(transport *t) {
finish_reads(t); finish_reads(t);
/* gather any callbacks that need to be made */ /* gather any callbacks that need to be made */
if (!t->calling_back) { if (!t->calling_back_ops) {
t->calling_back = perform_callbacks = prepare_callbacks(t); t->calling_back_ops = perform_callbacks = prepare_callbacks(t);
if (cb) { if (perform_callbacks) ref_transport(t);
}
if (!t->calling_back_channel && cb) {
if (t->error_state == ERROR_STATE_SEEN && !t->writing) { if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1; call_closed = 1;
t->calling_back = 1; t->calling_back_channel = 1;
t->cb = NULL; /* no more callbacks */ t->cb = NULL; /* no more callbacks */
t->error_state = ERROR_STATE_NOTIFIED; t->error_state = ERROR_STATE_NOTIFIED;
} }
@ -845,14 +851,12 @@ static void unlock(transport *t) {
t->pending_goaways = NULL; t->pending_goaways = NULL;
t->num_pending_goaways = 0; t->num_pending_goaways = 0;
t->cap_pending_goaways = 0; t->cap_pending_goaways = 0;
t->calling_back = 1; t->calling_back_channel = 1;
}
} }
} if (call_closed || num_goaways) {
if (perform_callbacks || call_closed || num_goaways) {
ref_transport(t); ref_transport(t);
} }
}
/* finally unlock */ /* finally unlock */
gpr_mu_unlock(&t->mu); gpr_mu_unlock(&t->mu);
@ -865,7 +869,11 @@ static void unlock(transport *t) {
} }
if (perform_callbacks) { if (perform_callbacks) {
run_callbacks(t, cb); run_callbacks(t);
lock(t);
t->calling_back_ops = 0;
unlock(t);
unref_transport(t);
} }
if (call_closed) { if (call_closed) {
@ -878,9 +886,9 @@ static void unlock(transport *t) {
perform_write(t, ep); perform_write(t, ep);
} }
if (perform_callbacks || call_closed || num_goaways) { if (call_closed || num_goaways) {
lock(t); lock(t);
t->calling_back = 0; t->calling_back_channel = 0;
if (t->destroying) gpr_cv_signal(&t->cv); if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t); unlock(t);
unref_transport(t); unref_transport(t);
@ -2101,7 +2109,7 @@ static int prepare_callbacks(transport *t) {
return t->executing_callbacks.count > 0; return t->executing_callbacks.count > 0;
} }
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { static void run_callbacks(transport *t) {
size_t i; size_t i;
for (i = 0; i < t->executing_callbacks.count; i++) { for (i = 0; i < t->executing_callbacks.count; i++) {
op_closure c = t->executing_callbacks.callbacks[i]; op_closure c = t->executing_callbacks.callbacks[i];

@ -238,6 +238,7 @@ static int set_socket_nonblocking(thread_args *args) {
static int do_nothing(thread_args *args) { return 0; } static int do_nothing(thread_args *args) { return 0; }
#ifdef __linux__
/* Special case for epoll, where we need to create the fd ahead of time. */ /* Special case for epoll, where we need to create the fd ahead of time. */
static int epoll_setup(thread_args *args) { static int epoll_setup(thread_args *args) {
int epoll_fd; int epoll_fd;
@ -258,6 +259,7 @@ static int epoll_setup(thread_args *args) {
} }
return 0; return 0;
} }
#endif
static void server_thread(thread_args *args) { static void server_thread(thread_args *args) {
char *buf = malloc(args->msg_size); char *buf = malloc(args->msg_size);

@ -112,7 +112,7 @@ static void QpsDriver() {
FLAGS_warmup_seconds, FLAGS_benchmark_seconds, FLAGS_local_workers); FLAGS_warmup_seconds, FLAGS_benchmark_seconds, FLAGS_local_workers);
GetReporter()->ReportQPS(*result); GetReporter()->ReportQPS(*result);
GetReporter()->ReportQPSPerCore(*result, server_config); GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result); GetReporter()->ReportLatency(*result);
GetReporter()->ReportTimes(*result); GetReporter()->ReportTimes(*result);
} }

@ -67,7 +67,7 @@ static void RunQPS() {
const auto result = const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
GetReporter()->ReportQPSPerCore(*result, server_config); GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result); GetReporter()->ReportLatency(*result);
} }

@ -50,10 +50,9 @@ void CompositeReporter::ReportQPS(const ScenarioResult& result) const {
} }
} }
void CompositeReporter::ReportQPSPerCore(const ScenarioResult& result, void CompositeReporter::ReportQPSPerCore(const ScenarioResult& result) const {
const ServerConfig& config) const {
for (size_t i = 0; i < reporters_.size(); ++i) { for (size_t i = 0; i < reporters_.size(); ++i) {
reporters_[i]->ReportQPSPerCore(result, config); reporters_[i]->ReportQPSPerCore(result);
} }
} }
@ -77,15 +76,14 @@ void GprLogReporter::ReportQPS(const ScenarioResult& result) const {
[](ResourceUsage u) { return u.wall_time; })); [](ResourceUsage u) { return u.wall_time; }));
} }
void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result, void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) const {
const ServerConfig& server_config) const {
auto qps = auto qps =
result.latencies.Count() / result.latencies.Count() /
average(result.client_resources, average(result.client_resources,
[](ResourceUsage u) { return u.wall_time; }); [](ResourceUsage u) { return u.wall_time; });
gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
qps / server_config.threads()); qps / result.server_config.threads());
} }
void GprLogReporter::ReportLatency(const ScenarioResult& result) const { void GprLogReporter::ReportLatency(const ScenarioResult& result) const {

@ -62,8 +62,7 @@ class Reporter {
virtual void ReportQPS(const ScenarioResult& result) const = 0; virtual void ReportQPS(const ScenarioResult& result) const = 0;
/** Reports QPS per core as (YYY/server core). */ /** Reports QPS per core as (YYY/server core). */
virtual void ReportQPSPerCore(const ScenarioResult& result, virtual void ReportQPSPerCore(const ScenarioResult& result) const = 0;
const ServerConfig& config) const = 0;
/** Reports latencies for the 50, 90, 95, 99 and 99.9 percentiles, in ms. */ /** Reports latencies for the 50, 90, 95, 99 and 99.9 percentiles, in ms. */
virtual void ReportLatency(const ScenarioResult& result) const = 0; virtual void ReportLatency(const ScenarioResult& result) const = 0;
@ -84,8 +83,7 @@ class CompositeReporter : public Reporter {
void add(std::unique_ptr<Reporter> reporter); void add(std::unique_ptr<Reporter> reporter);
void ReportQPS(const ScenarioResult& result) const GRPC_OVERRIDE; void ReportQPS(const ScenarioResult& result) const GRPC_OVERRIDE;
void ReportQPSPerCore(const ScenarioResult& result, void ReportQPSPerCore(const ScenarioResult& result) const GRPC_OVERRIDE;
const ServerConfig& config) const GRPC_OVERRIDE;
void ReportLatency(const ScenarioResult& result) const GRPC_OVERRIDE; void ReportLatency(const ScenarioResult& result) const GRPC_OVERRIDE;
void ReportTimes(const ScenarioResult& result) const GRPC_OVERRIDE; void ReportTimes(const ScenarioResult& result) const GRPC_OVERRIDE;
@ -100,8 +98,7 @@ class GprLogReporter : public Reporter {
private: private:
void ReportQPS(const ScenarioResult& result) const GRPC_OVERRIDE; void ReportQPS(const ScenarioResult& result) const GRPC_OVERRIDE;
void ReportQPSPerCore(const ScenarioResult& result, void ReportQPSPerCore(const ScenarioResult& result) const GRPC_OVERRIDE;
const ServerConfig& config) const GRPC_OVERRIDE;
void ReportLatency(const ScenarioResult& result) const GRPC_OVERRIDE; void ReportLatency(const ScenarioResult& result) const GRPC_OVERRIDE;
void ReportTimes(const ScenarioResult& result) const GRPC_OVERRIDE; void ReportTimes(const ScenarioResult& result) const GRPC_OVERRIDE;
}; };

@ -101,10 +101,11 @@ class AsyncQpsServerTest : public Server {
ServerRpcContext *ctx = detag(got_tag); ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke // The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok); bool still_going = ctx->RunNextState(ok);
std::lock_guard<std::mutex> g(shutdown_mutex_); std::unique_lock<std::mutex> g(shutdown_mutex_);
if (!shutdown_) { if (!shutdown_) {
// this RPC context is done, so refresh it // this RPC context is done, so refresh it
if (!still_going) { if (!still_going) {
g.unlock();
ctx->Reset(); ctx->Reset();
} }
} else { } else {

Loading…
Cancel
Save