Run run_after_write closures in h2 once write action is done.

We flush these closures only when the connection goes IDLE.
This will cause no completion being sent, if we have a continuous
stream of bytes that never stops, causing a memory bloat because
we never call the callbacks of the ops.

For example, we use 100s of GiB of memory after a minute of exchanging
1MiB RPCs with callback API.

This patch runs the closures when we have done running
one write action.

After this change memory remains stable for the 1MiB benchmark.
QPS is increased by 200 QPS (520 -> 749), and latency is dropped
by 70ms, because we were basically page-faulting on every RPC.
pull/18163/head
Soheil Hassas Yeganeh 6 years ago
parent 6e7ef05bff
commit 1027149f8d
  1. 11
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc

@ -1062,12 +1062,15 @@ static void write_action_end_locked(void* tp, grpc_error* error) {
GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
bool closed = false;
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
close_transport_locked(t, GRPC_ERROR_REF(error)); close_transport_locked(t, GRPC_ERROR_REF(error));
closed = true;
} }
if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
closed = true;
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
close_transport_locked( close_transport_locked(
t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
@ -1086,6 +1089,14 @@ static void write_action_end_locked(void* tp, grpc_error* error) {
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
t->is_first_write_in_batch = false; t->is_first_write_in_batch = false;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
// If the transport is closed, we will retry writing on the endpoint
// and next write may contain part of the currently serialized frames.
// So, we should only call the run_after_write callbacks when the next
// write finishes, or the callbacks will be invoked when the stream is
// closed.
if (!closed) {
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
}
GRPC_CLOSURE_RUN( GRPC_CLOSURE_RUN(
GRPC_CLOSURE_INIT(&t->write_action_begin_locked, GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
write_action_begin_locked, t, write_action_begin_locked, t,

Loading…
Cancel
Save