diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index d6b79bd4923..30738080ee7 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -884,14 +884,23 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, GPR_TIMER_BEGIN("write_action_begin_locked", 0); grpc_chttp2_transport *t = gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); - if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) { - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, - "begin writing"); - grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE); - } else { - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, - "begin writing nothing"); - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); + switch (t->closed ? GRPC_CHTTP2_NOTHING_TO_WRITE + : grpc_chttp2_begin_write(exec_ctx, t)) { + case GRPC_CHTTP2_NOTHING_TO_WRITE: + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, + "begin writing nothing"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); + break; + case GRPC_CHTTP2_PARTIAL_WRITE: + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, + "begin writing partial"); + grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE); + break; + case GRPC_CHTTP2_FULL_WRITE: + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, + "begin writing"); + grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE); + break; } GPR_TIMER_END("write_action_begin_locked", 0); } @@ -2130,27 +2139,29 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, double bdp_dbl) { - uint32_t bdp; - if (bdp_dbl <= 0) { - bdp = 0; - } else if (bdp_dbl > UINT32_MAX) { - bdp = UINT32_MAX; + int32_t bdp; + const int32_t kMinBDP = 128; + if (bdp_dbl <= kMinBDP) { + bdp = kMinBDP; + } else if (bdp_dbl > INT32_MAX) { + bdp = INT32_MAX; } else { - bdp = (uint32_t)(bdp_dbl); + bdp = (int32_t)(bdp_dbl); } int64_t delta = (int64_t)bdp - (int64_t)t->settings[GRPC_LOCAL_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) { + if (delta == 0 || (delta > -bdp / 10 && delta < bdp / 10)) { return; } if (grpc_bdp_estimator_trace) { gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string, (int)bdp); } - push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp); - push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, bdp); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + (uint32_t)bdp); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, (uint32_t)bdp); } static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, @@ -2543,7 +2554,7 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, add_max_recv_bytes); if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size - (int64_t)s->announce_window > - (int64_t)initial_window_size / 2) { + 2 * (int64_t)initial_window_size) { write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK; } grpc_chttp2_become_writable(exec_ctx, t, s, write_type, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 0aaa4aebe5f..733e9900ef1 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -552,9 +552,14 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, bool covered_by_poller, const char *reason); -/** Someone is unlocking the transport mutex: check to see if writes - are required, and frame them if so */ -bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); +typedef enum { + GRPC_CHTTP2_NOTHING_TO_WRITE, + GRPC_CHTTP2_PARTIAL_WRITE, + GRPC_CHTTP2_FULL_WRITE, +} grpc_chttp2_begin_write_result; + +grpc_chttp2_begin_write_result grpc_chttp2_begin_write( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 638b137316f..d0b94bd6ced 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -418,12 +418,7 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s, incoming_frame_size); - if ((int64_t)t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] + - (int64_t)s->incoming_window_delta - (int64_t)s->announce_window <= - (int64_t)t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] / - 2) { + if ((int64_t)s->incoming_window_delta - (int64_t)s->announce_window <= 0) { grpc_chttp2_become_writable(exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, "window-update-required"); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 069780ae5af..60db567fb32 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -160,19 +160,22 @@ static bool stream_ref_if_not_destroyed(gpr_refcount *r) { return true; } +/* How many bytes of incoming flow control would we like to advertise */ uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t) { - return (uint32_t)GPR_MAX( + return (uint32_t)GPR_MIN( (int64_t)((1u << 31) - 1), t->stream_total_over_incoming_window + - (int64_t)GPR_MAX( - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - - t->stream_total_under_incoming_window, - 0)); + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); } -bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +/* How many bytes would we like to put on the wire during a single syscall */ +static uint32_t target_write_size(grpc_chttp2_transport *t) { + return 1024 * 1024; +} + +grpc_chttp2_begin_write_result grpc_chttp2_begin_write( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_stream *s; GPR_TIMER_BEGIN("grpc_chttp2_begin_write", 0); @@ -206,9 +209,20 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, } } + bool partial_write = false; + /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ - while (grpc_chttp2_list_pop_writable_stream(t, &s)) { + while (true) { + if (t->outbuf.length > target_write_size(t)) { + partial_write = true; + break; + } + + if (!grpc_chttp2_list_pop_writable_stream(t, &s)) { + break; + } + bool sent_initial_metadata = s->sent_initial_metadata; bool now_writing = false; @@ -395,7 +409,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_chttp2_begin_write", 0); - return t->outbuf.count > 0; + return t->outbuf.count > 0 ? (partial_write ? GRPC_CHTTP2_PARTIAL_WRITE + : GRPC_CHTTP2_FULL_WRITE) + : GRPC_CHTTP2_NOTHING_TO_WRITE; } void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c index e1483677fd3..536694214e2 100644 --- a/src/core/lib/transport/bdp_estimator.c +++ b/src/core/lib/transport/bdp_estimator.c @@ -44,6 +44,7 @@ void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) { estimator->estimate = 65536; estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; estimator->name = name; + estimator->bw_est = 0; } bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator, @@ -84,16 +85,26 @@ void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) { GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED); estimator->ping_state = GRPC_BDP_PING_STARTED; estimator->accumulator = 0; + estimator->ping_start_time = gpr_now(GPR_CLOCK_MONOTONIC); } void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) { + gpr_timespec dt_ts = + gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), estimator->ping_start_time); + double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec; + double bw = dt > 0 ? ((double)estimator->accumulator / dt) : 0; if (grpc_bdp_estimator_trace) { - gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64, - estimator->name, estimator->accumulator, estimator->estimate); + gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64 + " dt=%lf bw=%lfMbs bw_est=%lfMbs", + estimator->name, estimator->accumulator, estimator->estimate, dt, + bw / 125000.0, estimator->bw_est / 125000.0); } GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED); - if (estimator->accumulator > 2 * estimator->estimate / 3) { - estimator->estimate *= 2; + if (estimator->accumulator > 2 * estimator->estimate / 3 && + bw > estimator->bw_est) { + estimator->estimate = + GPR_MAX(estimator->accumulator, estimator->estimate * 2); + estimator->bw_est = bw; if (grpc_bdp_estimator_trace) { gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, estimator->name, estimator->estimate); diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index df8d1f6fc0f..752bee1abda 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -34,6 +34,7 @@ #ifndef GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H #define GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H +#include #include #include @@ -52,6 +53,8 @@ typedef struct grpc_bdp_estimator { grpc_bdp_estimator_ping_state ping_state; int64_t accumulator; int64_t estimate; + gpr_timespec ping_start_time; + double bw_est; const char *name; } grpc_bdp_estimator; diff --git a/test/core/transport/bdp_estimator_test.c b/test/core/transport/bdp_estimator_test.c index f55a3ca6439..a6f1a553633 100644 --- a/test/core/transport/bdp_estimator_test.c +++ b/test/core/transport/bdp_estimator_test.c @@ -33,6 +33,7 @@ #include "src/core/lib/transport/bdp_estimator.h" +#include #include #include #include @@ -64,6 +65,8 @@ static void add_samples(grpc_bdp_estimator *estimator, int64_t *samples, GPR_ASSERT(grpc_bdp_estimator_add_incoming_bytes(estimator, samples[i]) == false); } + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(1, GPR_TIMESPAN))); grpc_bdp_estimator_complete_ping(estimator); } @@ -123,24 +126,25 @@ static void test_get_estimate_random_values(size_t n) { gpr_log(GPR_INFO, "test_get_estimate_random_values(%" PRIdPTR ")", n); grpc_bdp_estimator est; grpc_bdp_estimator_init(&est, "test"); - int min = INT_MAX; - int max = 65535; // Windows rand() has limited range, make sure the ASSERT - // passes + const int kMaxSample = 65535; + int min = kMaxSample; + int max = 0; for (size_t i = 0; i < n; i++) { - int sample = rand(); + int sample = rand() % (kMaxSample + 1); if (sample < min) min = sample; if (sample > max) max = sample; add_sample(&est, sample); if (i >= 3) { gpr_log(GPR_DEBUG, "est:%" PRId64 " min:%d max:%d", get_estimate(&est), min, max); - GPR_ASSERT(get_estimate(&est) <= 2 * next_pow_2(max)); + GPR_ASSERT(get_estimate(&est) <= GPR_MAX(65536, 2 * next_pow_2(max))); } } } int main(int argc, char **argv) { grpc_test_init(argc, argv); + grpc_init(); test_noop(); test_get_estimate_no_samples(); test_get_estimate_1_sample(); @@ -149,5 +153,6 @@ int main(int argc, char **argv) { for (size_t i = 3; i < 1000; i = i * 3 / 2) { test_get_estimate_random_values(i); } + grpc_shutdown(); return 0; } diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index 58ac59711be..69386a07181 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -44,6 +44,8 @@ #include #include "src/core/lib/slice/slice_internal.h" +#define WRITE_BUFFER_SIZE (2 * 1024 * 1024) + typedef struct { grpc_endpoint base; double bytes_per_second; @@ -55,6 +57,7 @@ typedef struct { grpc_slice_buffer writing_buffer; grpc_error *error; bool writing; + grpc_closure *write_cb; } trickle_endpoint; static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -63,10 +66,20 @@ static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb); } +static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx, + trickle_endpoint *te) { + if (te->write_cb != NULL && (te->error != GRPC_ERROR_NONE || + te->write_buffer.length <= WRITE_BUFFER_SIZE)) { + grpc_closure_sched(exec_ctx, te->write_cb, GRPC_ERROR_REF(te->error)); + te->write_cb = NULL; + } +} + static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb) { trickle_endpoint *te = (trickle_endpoint *)ep; gpr_mu_lock(&te->mu); + GPR_ASSERT(te->write_cb == NULL); if (te->write_buffer.length == 0) { te->last_write = gpr_now(GPR_CLOCK_MONOTONIC); } @@ -74,7 +87,8 @@ static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_slice_buffer_add(&te->write_buffer, grpc_slice_copy(slices->slices[i])); } - grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error)); + te->write_cb = cb; + maybe_call_write_cb_locked(exec_ctx, te); gpr_mu_unlock(&te->mu); } @@ -102,6 +116,7 @@ static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (te->error == GRPC_ERROR_NONE) { te->error = GRPC_ERROR_REF(why); } + maybe_call_write_cb_locked(exec_ctx, te); gpr_mu_unlock(&te->mu); grpc_endpoint_shutdown(exec_ctx, te->wrapped, why); } @@ -157,6 +172,7 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, te->base.vtable = &vtable; te->wrapped = wrap; te->bytes_per_second = bytes_per_second; + te->write_cb = NULL; gpr_mu_init(&te->mu); grpc_slice_buffer_init(&te->write_buffer); grpc_slice_buffer_init(&te->writing_buffer); @@ -187,9 +203,18 @@ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, grpc_endpoint_write( exec_ctx, te->wrapped, &te->writing_buffer, grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); + maybe_call_write_cb_locked(exec_ctx, te); } } size_t backlog = te->write_buffer.length; gpr_mu_unlock(&te->mu); return backlog; } + +size_t grpc_trickle_get_backlog(grpc_endpoint *ep) { + trickle_endpoint *te = (trickle_endpoint *)ep; + gpr_mu_lock(&te->mu); + size_t backlog = te->write_buffer.length; + gpr_mu_unlock(&te->mu); + return backlog; +} diff --git a/test/core/util/trickle_endpoint.h b/test/core/util/trickle_endpoint.h index 7e8d9d91e33..e513774eb4a 100644 --- a/test/core/util/trickle_endpoint.h +++ b/test/core/util/trickle_endpoint.h @@ -43,4 +43,6 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint); +size_t grpc_trickle_get_backlog(grpc_endpoint *endpoint); + #endif diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index cae3fa1a146..208ac6d794b 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -92,7 +92,7 @@ cc_test( cc_test( name = "bm_fullstack_trickle", srcs = ["bm_fullstack_trickle.cc"], - deps = [":helpers"], + deps = [":helpers", "//external:gflags"], ) cc_test( diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index a5cfeb4f955..fc99b06dbb2 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -34,6 +34,8 @@ /* Benchmark gRPC end2end in various configurations */ #include +#include +#include #include "src/core/lib/profiling/timers.h" #include "src/cpp/client/create_channel_internal.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -45,16 +47,57 @@ extern "C" { #include "test/core/util/trickle_endpoint.h" } +DEFINE_bool(log, false, "Log state to CSV files"); +DEFINE_int32( + warmup_megabytes, 1, + "Number of megabytes to pump before collecting flow control stats"); +DEFINE_int32( + warmup_iterations, 100, + "Number of megabytes to pump before collecting flow control stats"); +DEFINE_int32(warmup_max_time_seconds, 10, + "Maximum number of seconds to run warmup loop"); + namespace grpc { namespace testing { static void* tag(intptr_t x) { return reinterpret_cast(x); } +template +static void write_csv(std::ostream* out, A0&& a0) { + if (!out) return; + (*out) << a0 << "\n"; +} + +template +static void write_csv(std::ostream* out, A0&& a0, Arg&&... arg) { + if (!out) return; + (*out) << a0 << ","; + write_csv(out, std::forward(arg)...); +} + class TrickledCHTTP2 : public EndpointPairFixture { public: - TrickledCHTTP2(Service* service, size_t megabits_per_second) - : EndpointPairFixture(service, MakeEndpoints(megabits_per_second), - FixtureConfiguration()) {} + TrickledCHTTP2(Service* service, size_t message_size, + size_t kilobits_per_second) + : EndpointPairFixture(service, MakeEndpoints(kilobits_per_second), + FixtureConfiguration()) { + if (FLAGS_log) { + std::ostringstream fn; + fn << "trickle." << message_size << "." << kilobits_per_second << ".csv"; + log_.reset(new std::ofstream(fn.str().c_str())); + write_csv(log_.get(), "t", "iteration", "client_backlog", + "server_backlog", "client_t_stall", "client_s_stall", + "server_t_stall", "server_s_stall", "client_t_outgoing", + "server_t_outgoing", "client_t_incoming", "server_t_incoming", + "client_s_outgoing_delta", "server_s_outgoing_delta", + "client_s_incoming_delta", "server_s_incoming_delta", + "client_s_announce_window", "server_s_announce_window", + "client_peer_iws", "client_local_iws", "client_sent_iws", + "client_acked_iws", "server_peer_iws", "server_local_iws", + "server_sent_iws", "server_acked_iws", "client_queued_bytes", + "server_queued_bytes"); + } + } void AddToLabel(std::ostream& out, benchmark::State& state) { out << " writes/iter:" @@ -75,7 +118,58 @@ class TrickledCHTTP2 : public EndpointPairFixture { (double)state.iterations()); } - void Step() { + void Log(int64_t iteration) { + auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_); + grpc_chttp2_transport* client = + reinterpret_cast(client_transport_); + grpc_chttp2_transport* server = + reinterpret_cast(server_transport_); + grpc_chttp2_stream* client_stream = + client->stream_map.count == 1 + ? static_cast(client->stream_map.values[0]) + : nullptr; + grpc_chttp2_stream* server_stream = + server->stream_map.count == 1 + ? static_cast(server->stream_map.values[0]) + : nullptr; + write_csv( + log_.get(), static_cast(now.tv_sec) + + 1e-9 * static_cast(now.tv_nsec), + iteration, grpc_trickle_get_backlog(endpoint_pair_.client), + grpc_trickle_get_backlog(endpoint_pair_.server), + client->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, + client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, + server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, + server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, + client->outgoing_window, server->outgoing_window, + client->incoming_window, server->incoming_window, + client_stream ? client_stream->outgoing_window_delta : -1, + server_stream ? server_stream->outgoing_window_delta : -1, + client_stream ? client_stream->incoming_window_delta : -1, + server_stream ? server_stream->incoming_window_delta : -1, + client_stream ? client_stream->announce_window : -1, + server_stream ? server_stream->announce_window : -1, + client->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + client->settings[GRPC_LOCAL_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + client->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + client->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + server->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + server->settings[GRPC_LOCAL_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + server->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + server->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + client_stream ? client_stream->flow_controlled_buffer.length : 0, + server_stream ? server_stream->flow_controlled_buffer.length : 0); + } + + void Step(bool update_stats) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; size_t client_backlog = grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.client); @@ -83,10 +177,12 @@ class TrickledCHTTP2 : public EndpointPairFixture { grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.server); grpc_exec_ctx_finish(&exec_ctx); - UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_, - client_backlog); - UpdateStats((grpc_chttp2_transport*)server_transport_, &server_stats_, - server_backlog); + if (update_stats) { + UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_, + client_backlog); + UpdateStats((grpc_chttp2_transport*)server_transport_, &server_stats_, + server_backlog); + } } private: @@ -97,6 +193,8 @@ class TrickledCHTTP2 : public EndpointPairFixture { }; Stats client_stats_; Stats server_stats_; + std::unique_ptr log_; + gpr_timespec start_ = gpr_now(GPR_CLOCK_MONOTONIC); grpc_endpoint_pair MakeEndpoints(size_t kilobits) { grpc_endpoint_pair p; @@ -123,13 +221,15 @@ class TrickledCHTTP2 : public EndpointPairFixture { // force library initialization auto& force_library_initialization = Library::get(); -static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) { +static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok, + int64_t iteration) { while (true) { + fixture->Log(iteration); switch (fixture->cq()->AsyncNext( t, ok, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_micros(100, GPR_TIMESPAN)))) { case CompletionQueue::TIMEOUT: - fixture->Step(); + fixture->Step(iteration != -1); break; case CompletionQueue::SHUTDOWN: GPR_ASSERT(false); @@ -143,7 +243,7 @@ static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) { static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { EchoTestService::AsyncService service; std::unique_ptr fixture( - new TrickledCHTTP2(&service, state.range(1))); + new TrickledCHTTP2(&service, state.range(0), state.range(1))); { EchoResponse send_response; EchoResponse recv_response; @@ -163,18 +263,19 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { void* t; bool ok; while (need_tags) { - TrickleCQNext(fixture.get(), &t, &ok); + TrickleCQNext(fixture.get(), &t, &ok, -1); GPR_ASSERT(ok); int i = (int)(intptr_t)t; GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } request_rw->Read(&recv_response, tag(0)); - while (state.KeepRunning()) { + auto inner_loop = [&](bool in_warmup) { GPR_TIMER_SCOPE("BenchmarkCycle", 0); response_rw.Write(send_response, tag(1)); while (true) { - TrickleCQNext(fixture.get(), &t, &ok); + TrickleCQNext(fixture.get(), &t, &ok, + in_warmup ? -1 : state.iterations()); if (t == tag(0)) { request_rw->Read(&recv_response, tag(0)); } else if (t == tag(1)) { @@ -183,11 +284,26 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { GPR_ASSERT(false); } } + }; + gpr_timespec warmup_start = gpr_now(GPR_CLOCK_MONOTONIC); + for (int i = 0; + i < GPR_MAX(FLAGS_warmup_iterations, FLAGS_warmup_megabytes * 1024 * + 1024 / (14 + state.range(0))); + i++) { + inner_loop(true); + if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), warmup_start), + gpr_time_from_seconds(FLAGS_warmup_max_time_seconds, + GPR_TIMESPAN)) > 0) { + break; + } + } + while (state.KeepRunning()) { + inner_loop(false); } response_rw.Finish(Status::OK, tag(1)); need_tags = (1 << 0) | (1 << 1); while (need_tags) { - TrickleCQNext(fixture.get(), &t, &ok); + TrickleCQNext(fixture.get(), &t, &ok, -1); int i = (int)(intptr_t)t; GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); @@ -204,10 +320,10 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { static void TrickleArgs(benchmark::internal::Benchmark* b) { for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) { - for (int j = 1; j <= 128 * 1024 * 1024; j *= 8) { + for (int j = 64; j <= 128 * 1024 * 1024; j *= 8) { double expected_time = static_cast(14 + i) / (125.0 * static_cast(j)); - if (expected_time > 0.01) continue; + if (expected_time > 2.0) continue; b->Args({i, j}); } } @@ -217,4 +333,8 @@ BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(TrickleArgs); } } -BENCHMARK_MAIN(); +int main(int argc, char** argv) { + ::benchmark::Initialize(&argc, argv); + ::google::ParseCommandLineFlags(&argc, &argv, false); + ::benchmark::RunSpecifiedBenchmarks(); +} diff --git a/tools/profiling/microbenchmarks/bm_diff.py b/tools/profiling/microbenchmarks/bm_diff.py index b2d6f460475..299abb5fdb7 100755 --- a/tools/profiling/microbenchmarks/bm_diff.py +++ b/tools/profiling/microbenchmarks/bm_diff.py @@ -56,6 +56,10 @@ _INTERESTING = ( 'writes_per_iteration', 'atm_cas_per_iteration', 'atm_add_per_iteration', + 'cli_transport_stalls_per_iteration', + 'cli_stream_stalls_per_iteration', + 'svr_transport_stalls_per_iteration', + 'svr_stream_stalls_per_iteration' 'nows_per_iteration', )