From 17f3880ee15a19ddafd37a1f542d4beac3d7f117 Mon Sep 17 00:00:00 2001 From: Alisha Nanda Date: Fri, 1 Apr 2022 09:21:53 -0700 Subject: [PATCH] Refactor tcp_read (#29196) * Refactor tcp read somewhat * Fix memory leak from error and remove error creation from hot path --- src/core/lib/iomgr/tcp_posix.cc | 56 ++++++++++++++++----------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index cb84bbc6854..da8a6577af8 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -661,9 +661,8 @@ static void tcp_destroy(grpc_endpoint* ep) { TCP_UNREF(tcp, "destroy"); } -static void call_read_cb(grpc_tcp* tcp, grpc_error_handle error) { +static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error) { grpc_closure* cb = tcp->read_cb; - if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); size_t i; @@ -678,14 +677,11 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error_handle error) { } } } - - tcp->read_cb = nullptr; - tcp->incoming_buffer = nullptr; - grpc_core::Closure::Run(DEBUG_LOCATION, cb, error); } +/* Returns true if data available to read or error other than EAGAIN. */ #define MAX_READ_IOVEC 4 -static void tcp_do_read(grpc_tcp* tcp) { +static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) { GPR_TIMER_SCOPE("tcp_do_read", 0); struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; @@ -748,15 +744,12 @@ static void tcp_do_read(grpc_tcp* tcp) { if (errno == EAGAIN) { finish_estimate(tcp); tcp->inq = 0; - /* We've consumed the edge, request a new one */ - notify_on_read(tcp); + return false; } else { grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); - call_read_cb(tcp, - tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp)); - TCP_UNREF(tcp, "read"); + *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp); + return true; } - return; } if (read_bytes == 0) { /* 0 read size ==> end of stream @@ -765,11 +758,9 @@ static void tcp_do_read(grpc_tcp* tcp) { * since the connection is closed we will drop the data here, because we * can't call the callback multiple times. */ grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); - call_read_cb( - tcp, tcp_annotate_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); - TCP_UNREF(tcp, "read"); - return; + *error = tcp_annotate_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp); + return true; } GRPC_STATS_INC_TCP_READ_SIZE(read_bytes); @@ -828,11 +819,11 @@ static void tcp_do_read(grpc_tcp* tcp) { tcp->incoming_buffer->length - total_read_bytes, &tcp->last_read_buffer); } - call_read_cb(tcp, GRPC_ERROR_NONE); - TCP_UNREF(tcp, "read"); + *error = GRPC_ERROR_NONE; + return true; } -static void tcp_continue_read(grpc_tcp* tcp) { +static void maybe_make_read_slices(grpc_tcp* tcp) { if (tcp->incoming_buffer->length == 0 && tcp->incoming_buffer->count < MAX_READ_IOVEC) { if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { @@ -855,7 +846,6 @@ static void tcp_continue_read(grpc_tcp* tcp) { if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "TCP:%p do_read", tcp); } - tcp_do_read(tcp); } static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { @@ -864,15 +854,25 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp, grpc_error_std_string(error).c_str()); } - - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { + grpc_error_handle tcp_read_error; + if (GPR_LIKELY(error == GRPC_ERROR_NONE)) { + maybe_make_read_slices(tcp); + if (!tcp_do_read(tcp, &tcp_read_error)) { + /* We've consumed the edge, request a new one */ + notify_on_read(tcp); + return; + } + tcp_trace_read(tcp, tcp_read_error); + } else { + tcp_read_error = GRPC_ERROR_REF(error); grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); - call_read_cb(tcp, GRPC_ERROR_REF(error)); - TCP_UNREF(tcp, "read"); - } else { - tcp_continue_read(tcp); } + grpc_closure* cb = tcp->read_cb; + tcp->read_cb = nullptr; + tcp->incoming_buffer = nullptr; + grpc_core::Closure::Run(DEBUG_LOCATION, cb, tcp_read_error); + TCP_UNREF(tcp, "read"); } static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,