|
|
|
@ -61,6 +61,7 @@ |
|
|
|
|
#include "src/core/lib/profiling/timers.h" |
|
|
|
|
#include "src/core/lib/resource_quota/api.h" |
|
|
|
|
#include "src/core/lib/resource_quota/memory_quota.h" |
|
|
|
|
#include "src/core/lib/resource_quota/trace.h" |
|
|
|
|
#include "src/core/lib/slice/slice_internal.h" |
|
|
|
|
#include "src/core/lib/slice/slice_string_helpers.h" |
|
|
|
|
|
|
|
|
@ -365,6 +366,7 @@ struct grpc_tcp { |
|
|
|
|
/* Used by the endpoint read function to distinguish the very first read call
|
|
|
|
|
* from the rest */ |
|
|
|
|
bool is_first_read; |
|
|
|
|
bool has_posted_reclaimer; |
|
|
|
|
double target_length; |
|
|
|
|
double bytes_read_this_round; |
|
|
|
|
grpc_core::RefCount refcount; |
|
|
|
@ -376,7 +378,8 @@ struct grpc_tcp { |
|
|
|
|
/* garbage after the last read */ |
|
|
|
|
grpc_slice_buffer last_read_buffer; |
|
|
|
|
|
|
|
|
|
grpc_slice_buffer* incoming_buffer; |
|
|
|
|
absl::Mutex read_mu; |
|
|
|
|
grpc_slice_buffer* incoming_buffer ABSL_GUARDED_BY(read_mu) = nullptr; |
|
|
|
|
int inq; /* bytes pending on the socket from the last read. */ |
|
|
|
|
bool inq_capable; /* cache whether kernel supports inq */ |
|
|
|
|
|
|
|
|
@ -661,7 +664,34 @@ static void tcp_destroy(grpc_endpoint* ep) { |
|
|
|
|
TCP_UNREF(tcp, "destroy"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error) { |
|
|
|
|
static void perform_reclamation(grpc_tcp* tcp) |
|
|
|
|
ABSL_LOCKS_EXCLUDED(tcp->read_mu) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "TCP: benign reclamation to free memory"); |
|
|
|
|
} |
|
|
|
|
tcp->read_mu.Lock(); |
|
|
|
|
if (tcp->incoming_buffer != nullptr) { |
|
|
|
|
grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); |
|
|
|
|
} |
|
|
|
|
tcp->read_mu.Unlock(); |
|
|
|
|
tcp->has_posted_reclaimer = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void maybe_post_reclaimer(grpc_tcp* tcp) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { |
|
|
|
|
if (!tcp->has_posted_reclaimer) { |
|
|
|
|
tcp->has_posted_reclaimer = true; |
|
|
|
|
tcp->memory_owner.PostReclaimer( |
|
|
|
|
grpc_core::ReclamationPass::kBenign, |
|
|
|
|
[tcp](absl::optional<grpc_core::ReclamationSweep> sweep) { |
|
|
|
|
if (!sweep.has_value()) return; |
|
|
|
|
perform_reclamation(tcp); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { |
|
|
|
|
grpc_closure* cb = tcp->read_cb; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); |
|
|
|
@ -681,8 +711,12 @@ static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error) { |
|
|
|
|
|
|
|
|
|
/* Returns true if data available to read or error other than EAGAIN. */ |
|
|
|
|
#define MAX_READ_IOVEC 4 |
|
|
|
|
static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) { |
|
|
|
|
static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { |
|
|
|
|
GPR_TIMER_SCOPE("tcp_do_read", 0); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "TCP:%p do_read", tcp); |
|
|
|
|
} |
|
|
|
|
struct msghdr msg; |
|
|
|
|
struct iovec iov[MAX_READ_IOVEC]; |
|
|
|
|
ssize_t read_bytes; |
|
|
|
@ -823,7 +857,8 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void maybe_make_read_slices(grpc_tcp* tcp) { |
|
|
|
|
static void maybe_make_read_slices(grpc_tcp* tcp) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { |
|
|
|
|
if (tcp->incoming_buffer->length == 0 && |
|
|
|
|
tcp->incoming_buffer->count < MAX_READ_IOVEC) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
|
|
|
@ -842,9 +877,7 @@ static void maybe_make_read_slices(grpc_tcp* tcp) { |
|
|
|
|
tcp->min_read_chunk_size, |
|
|
|
|
grpc_core::Clamp(extra_wanted, tcp->min_read_chunk_size, |
|
|
|
|
tcp->max_read_chunk_size)))); |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "TCP:%p do_read", tcp); |
|
|
|
|
maybe_post_reclaimer(tcp); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -854,11 +887,13 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { |
|
|
|
|
gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp, |
|
|
|
|
grpc_error_std_string(error).c_str()); |
|
|
|
|
} |
|
|
|
|
tcp->read_mu.Lock(); |
|
|
|
|
grpc_error_handle tcp_read_error; |
|
|
|
|
if (GPR_LIKELY(error == GRPC_ERROR_NONE)) { |
|
|
|
|
maybe_make_read_slices(tcp); |
|
|
|
|
if (!tcp_do_read(tcp, &tcp_read_error)) { |
|
|
|
|
/* We've consumed the edge, request a new one */ |
|
|
|
|
tcp->read_mu.Unlock(); |
|
|
|
|
notify_on_read(tcp); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -871,6 +906,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { |
|
|
|
|
grpc_closure* cb = tcp->read_cb; |
|
|
|
|
tcp->read_cb = nullptr; |
|
|
|
|
tcp->incoming_buffer = nullptr; |
|
|
|
|
tcp->read_mu.Unlock(); |
|
|
|
|
grpc_core::Closure::Run(DEBUG_LOCATION, cb, tcp_read_error); |
|
|
|
|
TCP_UNREF(tcp, "read"); |
|
|
|
|
} |
|
|
|
@ -880,9 +916,11 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, |
|
|
|
|
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); |
|
|
|
|
GPR_ASSERT(tcp->read_cb == nullptr); |
|
|
|
|
tcp->read_cb = cb; |
|
|
|
|
tcp->read_mu.Lock(); |
|
|
|
|
tcp->incoming_buffer = incoming_buffer; |
|
|
|
|
grpc_slice_buffer_reset_and_unref_internal(incoming_buffer); |
|
|
|
|
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); |
|
|
|
|
tcp->read_mu.Unlock(); |
|
|
|
|
TCP_REF(tcp, "read"); |
|
|
|
|
if (tcp->is_first_read) { |
|
|
|
|
/* Endpoint read called for the very first time. Register read callback with
|
|
|
|
@ -1741,13 +1779,13 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, |
|
|
|
|
tcp->current_zerocopy_send = nullptr; |
|
|
|
|
tcp->release_fd_cb = nullptr; |
|
|
|
|
tcp->release_fd = nullptr; |
|
|
|
|
tcp->incoming_buffer = nullptr; |
|
|
|
|
tcp->target_length = static_cast<double>(tcp_read_chunk_size); |
|
|
|
|
tcp->min_read_chunk_size = tcp_min_read_chunk_size; |
|
|
|
|
tcp->max_read_chunk_size = tcp_max_read_chunk_size; |
|
|
|
|
tcp->bytes_read_this_round = 0; |
|
|
|
|
/* Will be set to false by the very first endpoint read function */ |
|
|
|
|
tcp->is_first_read = true; |
|
|
|
|
tcp->has_posted_reclaimer = false; |
|
|
|
|
tcp->bytes_counter = -1; |
|
|
|
|
tcp->socket_ts_enabled = false; |
|
|
|
|
tcp->ts_capable = true; |
|
|
|
|