From cbb384007ed9f0c5409c7f7eb99187d018c87b4c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 21 Jul 2017 09:52:25 -0700 Subject: [PATCH] Fix bm_chttp2_transport --- .../microbenchmarks/bm_chttp2_transport.cc | 121 +++++++++++------- 1 file changed, 75 insertions(+), 46 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 567ef1cf24c..4f80ff68f37 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -29,6 +29,7 @@ extern "C" { #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/static_metadata.h" @@ -154,23 +155,59 @@ class Fixture { grpc_transport *t_; }; -static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +class Closure : public grpc_closure { + public: + virtual ~Closure() {} +}; + +template +std::unique_ptr MakeClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public Closure { + C(const F &f, grpc_closure_scheduler *sched) : f_(f) { + GRPC_CLOSURE_INIT(this, Execute, this, sched); + } + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast(arg)->f_(exec_ctx, error); + } + }; + return std::unique_ptr(new C(f, sched)); +} + +template +grpc_closure *MakeOnceClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public grpc_closure { + C(const F &f) : f_(f) {} + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast(arg)->f_(exec_ctx, error); + delete static_cast(arg); + } + }; + auto *c = new C{f}; + return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); +} class Stream { public: Stream(Fixture *f) : f_(f) { - GRPC_STREAM_REF_INIT(&refcount_, 1, DoNothing, nullptr, "test_stream"); stream_size_ = grpc_transport_stream_size(f->transport()); stream_ = gpr_malloc(stream_size_); arena_ = gpr_arena_create(4096); } ~Stream() { + gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_free(stream_); gpr_arena_destroy(arena_); } void Init(benchmark::State &state) { + GRPC_STREAM_REF_INIT(&refcount_, 1, &Stream::FinishDestroy, this, + "test_stream"); + gpr_event_init(&done_); memset(stream_, 0, stream_size_); if ((state.iterations() & 0xffff) == 0) { gpr_arena_destroy(arena_); @@ -182,8 +219,12 @@ class Stream { } void DestroyThen(grpc_closure *closure) { - grpc_transport_destroy_stream(f_->exec_ctx(), f_->transport(), - static_cast(stream_), closure); + destroy_closure_ = closure; +#ifndef NDEBUG + grpc_stream_unref(f_->exec_ctx(), &refcount_, "DestroyThen"); +#else + grpc_stream_unref(f_->exec_ctx(), &refcount_); +#endif } void Op(grpc_transport_stream_op_batch *op) { @@ -196,48 +237,24 @@ class Stream { } private: + static void FinishDestroy(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + auto stream = static_cast(arg); + grpc_transport_destroy_stream(exec_ctx, stream->f_->transport(), + static_cast(stream->stream_), + stream->destroy_closure_); + gpr_event_set(&stream->done_, (void *)1); + } + Fixture *f_; grpc_stream_refcount refcount_; gpr_arena *arena_; size_t stream_size_; void *stream_; + grpc_closure *destroy_closure_ = nullptr; + gpr_event done_; }; -class Closure : public grpc_closure { - public: - virtual ~Closure() {} -}; - -template -std::unique_ptr MakeClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public Closure { - C(const F &f, grpc_closure_scheduler *sched) : f_(f) { - GRPC_CLOSURE_INIT(this, Execute, this, sched); - } - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast(arg)->f_(exec_ctx, error); - } - }; - return std::unique_ptr(new C(f, sched)); -} - -template -grpc_closure *MakeOnceClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public grpc_closure { - C(const F &f) : f_(f) {} - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast(arg)->f_(exec_ctx, error); - delete static_cast(arg); - } - }; - auto *c = new C{f}; - return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); -} - //////////////////////////////////////////////////////////////////////////////// // Benchmarks // @@ -246,10 +263,17 @@ static void BM_StreamCreateDestroy(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); Stream s(&f); + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload op_payload; + memset(&op, 0, sizeof(op)); + op.cancel_stream = true; + op.payload = &op_payload; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; std::unique_ptr next = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!state.KeepRunning()) return; s.Init(state); + s.Op(&op); s.DestroyThen(next.get()); }); GRPC_CLOSURE_RUN(f.exec_ctx(), next.get(), GRPC_ERROR_NONE); @@ -350,6 +374,10 @@ static void BM_TransportEmptyOp(benchmark::State &state) { }); GRPC_CLOSURE_SCHED(f.exec_ctx(), c.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); + reset_op(); + op.cancel_stream = true; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(&op); s.DestroyThen( MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); f.FlushExecCtx(); @@ -360,8 +388,8 @@ BENCHMARK(BM_TransportEmptyOp); static void BM_TransportStreamSend(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); - Stream s(&f); - s.Init(state); + auto s = std::unique_ptr(new Stream(&f)); + s->Init(state); grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch_payload op_payload; auto reset_op = [&]() { @@ -391,30 +419,31 @@ static void BM_TransportStreamSend(benchmark::State &state) { MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!state.KeepRunning()) return; // force outgoing window to be yuge - s.chttp2_stream()->outgoing_window_delta = 1024 * 1024 * 1024; + s->chttp2_stream()->outgoing_window_delta = 1024 * 1024 * 1024; f.chttp2_transport()->outgoing_window = 1024 * 1024 * 1024; grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); reset_op(); op.on_complete = c.get(); op.send_message = true; op.payload->send_message.send_message = &send_stream.base; - s.Op(&op); + s->Op(&op); }); reset_op(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; op.on_complete = c.get(); - s.Op(&op); + s->Op(&op); f.FlushExecCtx(); reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( + s->Op(&op); + s->DestroyThen( MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); f.FlushExecCtx(); + s.reset(); track_counters.Finish(state); grpc_metadata_batch_destroy(f.exec_ctx(), &b); grpc_slice_buffer_destroy(&send_buffer);