Heap allocate the stream object for other benchmark cases too

pull/19342/head
Yash Tibrewal 6 years ago
parent cceca10a8a
commit 56a0153f16
  1. 60
      test/cpp/microbenchmarks/bm_chttp2_transport.cc

@ -259,18 +259,21 @@ static void BM_StreamCreateDestroy(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
Fixture f(grpc::ChannelArguments(), true); Fixture f(grpc::ChannelArguments(), true);
Stream s(&f); auto* s = new Stream(&f);
grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload op_payload(nullptr); grpc_transport_stream_op_batch_payload op_payload(nullptr);
memset(&op, 0, sizeof(op)); memset(&op, 0, sizeof(op));
op.cancel_stream = true; op.cancel_stream = true;
op.payload = &op_payload; op.payload = &op_payload;
op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
std::unique_ptr<Closure> next = MakeClosure([&](grpc_error* error) { std::unique_ptr<Closure> next = MakeClosure([&, s](grpc_error* error) {
if (!state.KeepRunning()) return; if (!state.KeepRunning()) {
s.Init(state); delete s;
s.Op(&op); return;
s.DestroyThen(next.get()); }
s->Init(state);
s->Op(&op);
s->DestroyThen(next.get());
}); });
GRPC_CLOSURE_RUN(next.get(), GRPC_ERROR_NONE); GRPC_CLOSURE_RUN(next.get(), GRPC_ERROR_NONE);
f.FlushExecCtx(); f.FlushExecCtx();
@ -305,7 +308,7 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
Fixture f(grpc::ChannelArguments(), true); Fixture f(grpc::ChannelArguments(), true);
Stream s(&f); auto* s = new Stream(&f);
grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload op_payload(nullptr); grpc_transport_stream_op_batch_payload op_payload(nullptr);
std::unique_ptr<Closure> start; std::unique_ptr<Closure> start;
@ -327,21 +330,24 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State& state) {
} }
f.FlushExecCtx(); f.FlushExecCtx();
start = MakeClosure([&](grpc_error* error) { start = MakeClosure([&, s](grpc_error* error) {
if (!state.KeepRunning()) return; if (!state.KeepRunning()) {
s.Init(state); delete s;
return;
}
s->Init(state);
reset_op(); reset_op();
op.on_complete = done.get(); op.on_complete = done.get();
op.send_initial_metadata = true; op.send_initial_metadata = true;
op.payload->send_initial_metadata.send_initial_metadata = &b; op.payload->send_initial_metadata.send_initial_metadata = &b;
s.Op(&op); s->Op(&op);
}); });
done = MakeClosure([&](grpc_error* error) { done = MakeClosure([&](grpc_error* error) {
reset_op(); reset_op();
op.cancel_stream = true; op.cancel_stream = true;
op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
s.Op(&op); s->Op(&op);
s.DestroyThen(start.get()); s->DestroyThen(start.get());
}); });
GRPC_CLOSURE_SCHED(start.get(), GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(start.get(), GRPC_ERROR_NONE);
f.FlushExecCtx(); f.FlushExecCtx();
@ -355,8 +361,8 @@ static void BM_TransportEmptyOp(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
Fixture f(grpc::ChannelArguments(), true); Fixture f(grpc::ChannelArguments(), true);
Stream s(&f); auto* s = new Stream(&f);
s.Init(state); s->Init(state);
grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload op_payload(nullptr); grpc_transport_stream_op_batch_payload op_payload(nullptr);
auto reset_op = [&]() { auto reset_op = [&]() {
@ -367,15 +373,15 @@ static void BM_TransportEmptyOp(benchmark::State& state) {
if (!state.KeepRunning()) return; if (!state.KeepRunning()) return;
reset_op(); reset_op();
op.on_complete = c.get(); op.on_complete = c.get();
s.Op(&op); s->Op(&op);
}); });
GRPC_CLOSURE_SCHED(c.get(), GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(c.get(), GRPC_ERROR_NONE);
f.FlushExecCtx(); f.FlushExecCtx();
reset_op(); reset_op();
op.cancel_stream = true; op.cancel_stream = true;
op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
s.Op(&op); s->Op(&op);
s.DestroyThen(MakeOnceClosure([](grpc_error* error) {})); s->DestroyThen(MakeOnceClosure([s](grpc_error* error) { delete s; }));
f.FlushExecCtx(); f.FlushExecCtx();
track_counters.Finish(state); track_counters.Finish(state);
} }
@ -519,8 +525,8 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
Fixture f(grpc::ChannelArguments(), true); Fixture f(grpc::ChannelArguments(), true);
Stream s(&f); auto* s = new Stream(&f);
s.Init(state); s->Init(state);
grpc_transport_stream_op_batch_payload op_payload(nullptr); grpc_transport_stream_op_batch_payload op_payload(nullptr);
grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch op;
grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_stream; grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_stream;
@ -556,7 +562,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
std::unique_ptr<Closure> c = MakeClosure([&](grpc_error* error) { std::unique_ptr<Closure> c = MakeClosure([&](grpc_error* error) {
if (!state.KeepRunning()) return; if (!state.KeepRunning()) return;
// force outgoing window to be yuge // force outgoing window to be yuge
s.chttp2_stream()->flow_control->TestOnlyForceHugeWindow(); s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow();
f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow(); f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow();
received = 0; received = 0;
reset_op(); reset_op();
@ -564,7 +570,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
op.recv_message = true; op.recv_message = true;
op.payload->recv_message.recv_message = &recv_stream; op.payload->recv_message.recv_message = &recv_stream;
op.payload->recv_message.recv_message_ready = drain_start.get(); op.payload->recv_message.recv_message_ready = drain_start.get();
s.Op(&op); s->Op(&op);
f.PushInput(grpc_slice_ref(incoming_data)); f.PushInput(grpc_slice_ref(incoming_data));
}); });
@ -605,7 +611,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
op.payload->recv_initial_metadata.recv_initial_metadata_ready = op.payload->recv_initial_metadata.recv_initial_metadata_ready =
do_nothing.get(); do_nothing.get();
op.on_complete = c.get(); op.on_complete = c.get();
s.Op(&op); s->Op(&op);
f.PushInput(SLICE_FROM_BUFFER( f.PushInput(SLICE_FROM_BUFFER(
"\x00\x00\x00\x04\x00\x00\x00\x00\x00" "\x00\x00\x00\x04\x00\x00\x00\x00\x00"
// Generated using: // Generated using:
@ -623,12 +629,12 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
reset_op(); reset_op();
op.cancel_stream = true; op.cancel_stream = true;
op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
s.Op(&op); s->Op(&op);
s.DestroyThen(MakeOnceClosure([](grpc_error* error) {})); s->DestroyThen(MakeOnceClosure([s](grpc_error* error) { delete s; }));
f.FlushExecCtx();
track_counters.Finish(state);
grpc_metadata_batch_destroy(&b); grpc_metadata_batch_destroy(&b);
grpc_metadata_batch_destroy(&b_recv); grpc_metadata_batch_destroy(&b_recv);
f.FlushExecCtx();
track_counters.Finish(state);
grpc_slice_unref(incoming_data); grpc_slice_unref(incoming_data);
} }
BENCHMARK(BM_TransportStreamRecv)->Range(0, 128 * 1024 * 1024); BENCHMARK(BM_TransportStreamRecv)->Range(0, 128 * 1024 * 1024);

Loading…
Cancel
Save