diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 6d9e044fa2e..d09cfca9af4 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -81,9 +81,7 @@ typedef struct { grpc_slice_buffer* incoming_buffer; grpc_slice_buffer* outgoing_buffer; - /** slice within outgoing_buffer to write next */ - size_t outgoing_slice_idx; - /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ + /** byte within outgoing_buffer->slices[0] to write next */ size_t outgoing_byte_idx; grpc_closure* read_cb; @@ -532,23 +530,26 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, size_t unwind_slice_idx; size_t unwind_byte_idx; + // We always start at zero, because we eagerly unref and trim the slice + // buffer as we write + size_t outgoing_slice_idx = 0; + for (;;) { sending_length = 0; - unwind_slice_idx = tcp->outgoing_slice_idx; + unwind_slice_idx = outgoing_slice_idx; unwind_byte_idx = tcp->outgoing_byte_idx; - for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && + for (iov_size = 0; outgoing_slice_idx != tcp->outgoing_buffer->count && iov_size != MAX_WRITE_IOVEC; iov_size++) { iov[iov_size].iov_base = GRPC_SLICE_START_PTR( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) + + tcp->outgoing_buffer->slices[outgoing_slice_idx]) + tcp->outgoing_byte_idx; iov[iov_size].iov_len = - GRPC_SLICE_LENGTH( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) - + GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]) - tcp->outgoing_byte_idx; sending_length += iov[iov_size].iov_len; - tcp->outgoing_slice_idx++; + outgoing_slice_idx++; tcp->outgoing_byte_idx = 0; } GPR_ASSERT(iov_size > 0); @@ -574,16 +575,25 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, if (sent_length < 0) { if (errno == EAGAIN) { - tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; + // unref all and forget about all slices that have been written to this + // point + for (size_t idx = 0; idx < unwind_slice_idx; ++idx) { + grpc_slice_unref_internal( + exec_ctx, grpc_slice_buffer_take_first(tcp->outgoing_buffer)); + } return false; } else if (errno == EPIPE) { *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } else { *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } } @@ -593,9 +603,9 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, while (trailing > 0) { size_t slice_length; - tcp->outgoing_slice_idx--; - slice_length = GRPC_SLICE_LENGTH( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]); + outgoing_slice_idx--; + slice_length = + GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]); if (slice_length > trailing) { tcp->outgoing_byte_idx = slice_length - trailing; break; @@ -604,11 +614,13 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, } } - if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { + if (outgoing_slice_idx == tcp->outgoing_buffer->count) { *error = GRPC_ERROR_NONE; + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } - }; + } } static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, @@ -672,7 +684,6 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, return; } tcp->outgoing_buffer = buf; - tcp->outgoing_slice_idx = 0; tcp->outgoing_byte_idx = 0; if (!tcp_flush(exec_ctx, tcp, &error)) { diff --git a/src/core/lib/slice/slice_internal.h b/src/core/lib/slice/slice_internal.h index 2439fc08267..10527dcdeb5 100644 --- a/src/core/lib/slice/slice_internal.h +++ b/src/core/lib/slice/slice_internal.h @@ -32,6 +32,9 @@ grpc_slice grpc_slice_ref_internal(grpc_slice slice); void grpc_slice_unref_internal(grpc_exec_ctx* exec_ctx, grpc_slice slice); void grpc_slice_buffer_reset_and_unref_internal(grpc_exec_ctx* exec_ctx, grpc_slice_buffer* sb); +void grpc_slice_buffer_partial_unref_internal(grpc_exec_ctx* exec_ctx, + grpc_slice_buffer* sb, + size_t idx); void grpc_slice_buffer_destroy_internal(grpc_exec_ctx* exec_ctx, grpc_slice_buffer* sb); diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index 0bdd15c57d5..ac0c953a79f 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -88,9 +88,11 @@ typedef struct proxy_connection { grpc_slice_buffer client_read_buffer; grpc_slice_buffer client_deferred_write_buffer; + bool client_is_writing; grpc_slice_buffer client_write_buffer; grpc_slice_buffer server_read_buffer; grpc_slice_buffer server_deferred_write_buffer; + bool server_is_writing; grpc_slice_buffer server_write_buffer; grpc_http_parser http_parser; @@ -148,6 +150,7 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; + conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, true /* is_client */, "HTTP proxy client write", error); @@ -160,6 +163,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, if (conn->client_deferred_write_buffer.length > 0) { grpc_slice_buffer_move_into(&conn->client_deferred_write_buffer, &conn->client_write_buffer); + conn->client_is_writing = true; grpc_endpoint_write(exec_ctx, conn->client_endpoint, &conn->client_write_buffer, &conn->on_client_write_done); @@ -173,6 +177,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; + conn->server_is_writing = false; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, false /* is_client */, "HTTP proxy server write", error); @@ -185,6 +190,7 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, if (conn->server_deferred_write_buffer.length > 0) { grpc_slice_buffer_move_into(&conn->server_deferred_write_buffer, &conn->server_write_buffer); + conn->server_is_writing = true; grpc_endpoint_write(exec_ctx, conn->server_endpoint, &conn->server_write_buffer, &conn->on_server_write_done); @@ -210,13 +216,14 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, // the current write is finished. // // Otherwise, move the read data into the write buffer and write it. - if (conn->server_write_buffer.length > 0) { + if (conn->server_is_writing) { grpc_slice_buffer_move_into(&conn->client_read_buffer, &conn->server_deferred_write_buffer); } else { grpc_slice_buffer_move_into(&conn->client_read_buffer, &conn->server_write_buffer); proxy_connection_ref(conn, "client_read"); + conn->server_is_writing = true; grpc_endpoint_write(exec_ctx, conn->server_endpoint, &conn->server_write_buffer, &conn->on_server_write_done); @@ -242,13 +249,14 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, // the current write is finished. // // Otherwise, move the read data into the write buffer and write it. - if (conn->client_write_buffer.length > 0) { + if (conn->client_is_writing) { grpc_slice_buffer_move_into(&conn->server_read_buffer, &conn->client_deferred_write_buffer); } else { grpc_slice_buffer_move_into(&conn->server_read_buffer, &conn->client_write_buffer); proxy_connection_ref(conn, "server_read"); + conn->client_is_writing = true; grpc_endpoint_write(exec_ctx, conn->client_endpoint, &conn->client_write_buffer, &conn->on_client_write_done); @@ -262,6 +270,7 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; + conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, true /* is_client */, "HTTP proxy write response", error); @@ -302,6 +311,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice slice = grpc_slice_from_copied_string("HTTP/1.0 200 connected\r\n\r\n"); grpc_slice_buffer_add(&conn->client_write_buffer, slice); + conn->client_is_writing = true; grpc_endpoint_write(exec_ctx, conn->client_endpoint, &conn->client_write_buffer, &conn->on_write_response_done); @@ -450,9 +460,11 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_combiner_scheduler(conn->proxy->combiner)); grpc_slice_buffer_init(&conn->client_read_buffer); grpc_slice_buffer_init(&conn->client_deferred_write_buffer); + conn->client_is_writing = false; grpc_slice_buffer_init(&conn->client_write_buffer); grpc_slice_buffer_init(&conn->server_read_buffer); grpc_slice_buffer_init(&conn->server_deferred_write_buffer); + conn->server_is_writing = false; grpc_slice_buffer_init(&conn->server_write_buffer); grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, &conn->http_request);