Refactor tcp_read (#29196)

* Refactor tcp read somewhat

* Fix memory leak from error and remove error creation from hot path
pull/29281/head
Alisha Nanda 3 years ago committed by GitHub
parent 58612ba155
commit 17f3880ee1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 56
      src/core/lib/iomgr/tcp_posix.cc

@ -661,9 +661,8 @@ static void tcp_destroy(grpc_endpoint* ep) {
TCP_UNREF(tcp, "destroy");
}
static void call_read_cb(grpc_tcp* tcp, grpc_error_handle error) {
static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error) {
grpc_closure* cb = tcp->read_cb;
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
size_t i;
@ -678,14 +677,11 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error_handle error) {
}
}
}
tcp->read_cb = nullptr;
tcp->incoming_buffer = nullptr;
grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
}
/* Returns true if data available to read or error other than EAGAIN. */
#define MAX_READ_IOVEC 4
static void tcp_do_read(grpc_tcp* tcp) {
static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) {
GPR_TIMER_SCOPE("tcp_do_read", 0);
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
@ -748,15 +744,12 @@ static void tcp_do_read(grpc_tcp* tcp) {
if (errno == EAGAIN) {
finish_estimate(tcp);
tcp->inq = 0;
/* We've consumed the edge, request a new one */
notify_on_read(tcp);
return false;
} else {
grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
call_read_cb(tcp,
tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp));
TCP_UNREF(tcp, "read");
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp);
return true;
}
return;
}
if (read_bytes == 0) {
/* 0 read size ==> end of stream
@ -765,11 +758,9 @@ static void tcp_do_read(grpc_tcp* tcp) {
* since the connection is closed we will drop the data here, because we
* can't call the callback multiple times. */
grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
call_read_cb(
tcp, tcp_annotate_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
TCP_UNREF(tcp, "read");
return;
*error = tcp_annotate_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp);
return true;
}
GRPC_STATS_INC_TCP_READ_SIZE(read_bytes);
@ -828,11 +819,11 @@ static void tcp_do_read(grpc_tcp* tcp) {
tcp->incoming_buffer->length - total_read_bytes,
&tcp->last_read_buffer);
}
call_read_cb(tcp, GRPC_ERROR_NONE);
TCP_UNREF(tcp, "read");
*error = GRPC_ERROR_NONE;
return true;
}
static void tcp_continue_read(grpc_tcp* tcp) {
static void maybe_make_read_slices(grpc_tcp* tcp) {
if (tcp->incoming_buffer->length == 0 &&
tcp->incoming_buffer->count < MAX_READ_IOVEC) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
@ -855,7 +846,6 @@ static void tcp_continue_read(grpc_tcp* tcp) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP:%p do_read", tcp);
}
tcp_do_read(tcp);
}
static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
@ -864,15 +854,25 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp,
grpc_error_std_string(error).c_str());
}
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
grpc_error_handle tcp_read_error;
if (GPR_LIKELY(error == GRPC_ERROR_NONE)) {
maybe_make_read_slices(tcp);
if (!tcp_do_read(tcp, &tcp_read_error)) {
/* We've consumed the edge, request a new one */
notify_on_read(tcp);
return;
}
tcp_trace_read(tcp, tcp_read_error);
} else {
tcp_read_error = GRPC_ERROR_REF(error);
grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
call_read_cb(tcp, GRPC_ERROR_REF(error));
TCP_UNREF(tcp, "read");
} else {
tcp_continue_read(tcp);
}
grpc_closure* cb = tcp->read_cb;
tcp->read_cb = nullptr;
tcp->incoming_buffer = nullptr;
grpc_core::Closure::Run(DEBUG_LOCATION, cb, tcp_read_error);
TCP_UNREF(tcp, "read");
}
static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,

Loading…
Cancel
Save