From 1027149f8dd7e11b69dd399cd854aab3fea2f6e3 Mon Sep 17 00:00:00 2001 From: Soheil Hassas Yeganeh Date: Mon, 25 Feb 2019 18:53:16 -0500 Subject: [PATCH] Run run_after_write closures in h2 once write action is done. We flush these closures only when the connection goes IDLE. This will cause no completion being sent, if we have a continuous stream of bytes that never stops, causing a memory bloat because we never call the callbacks of the ops. For example, we use 100s of GiB of memory after a minute of exchanging 1MiB RPCs with callback API. This patch runs the closures when we have done running one write action. After this change memory remains stable for the 1MiB benchmark. QPS is increased by 200 QPS (520 -> 749), and latency is dropped by 70ms, because we were basically page-faulting on every RPC. --- .../transport/chttp2/transport/chttp2_transport.cc | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 1c9b37dada2..970c71b663d 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1062,12 +1062,15 @@ static void write_action_end_locked(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); grpc_chttp2_transport* t = static_cast(tp); + bool closed = false; if (error != GRPC_ERROR_NONE) { close_transport_locked(t, GRPC_ERROR_REF(error)); + closed = true; } if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; + closed = true; if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { close_transport_locked( t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); @@ -1086,6 +1089,14 @@ static void write_action_end_locked(void* tp, grpc_error* error) { set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); t->is_first_write_in_batch = false; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); + // If the transport is closed, we will retry writing on the endpoint + // and next write may contain part of the currently serialized frames. + // So, we should only call the run_after_write callbacks when the next + // write finishes, or the callbacks will be invoked when the stream is + // closed. + if (!closed) { + GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + } GRPC_CLOSURE_RUN( GRPC_CLOSURE_INIT(&t->write_action_begin_locked, write_action_begin_locked, t,