diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index fa5aaa9e7ce..2b1eb92bbd4 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -144,7 +144,7 @@ void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error* error) { // The read callback inherits our ref to the handshaker. grpc_endpoint_read(handshaker->args_->endpoint, handshaker->args_->read_buffer, - &handshaker->response_read_closure_); + &handshaker->response_read_closure_, /*urgent=*/true); gpr_mu_unlock(&handshaker->mu_); } } @@ -207,7 +207,7 @@ void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error* error) { grpc_slice_buffer_reset_and_unref_internal(handshaker->args_->read_buffer); grpc_endpoint_read(handshaker->args_->endpoint, handshaker->args_->read_buffer, - &handshaker->response_read_closure_); + &handshaker->response_read_closure_, /*urgent=*/true); gpr_mu_unlock(&handshaker->mu_); return; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 306349b7910..888c1757be1 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -2577,7 +2577,8 @@ static void read_action_locked(void* tp, grpc_error* error) { grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer); if (keep_reading) { - grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked); + const bool urgent = t->goaway_error != GRPC_ERROR_NONE; + grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent); grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keep_reading"); diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc index 8c9ce4da0d3..8a8da8b1604 100644 --- a/src/core/lib/http/httpcli.cc +++ b/src/core/lib/http/httpcli.cc @@ -121,7 +121,7 @@ static void append_error(internal_request* req, grpc_error* error) { } static void do_read(internal_request* req) { - grpc_endpoint_read(req->ep, &req->incoming, &req->on_read); + grpc_endpoint_read(req->ep, &req->incoming, &req->on_read, /*urgent=*/true); } static void on_read(void* user_data, grpc_error* error) { diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc index 06316c60315..bb07fe79608 100644 --- a/src/core/lib/iomgr/endpoint.cc +++ b/src/core/lib/iomgr/endpoint.cc @@ -23,8 +23,8 @@ grpc_core::TraceFlag grpc_tcp_trace(false, "tcp"); void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { - ep->vtable->read(ep, slices, cb); + grpc_closure* cb, bool urgent) { + ep->vtable->read(ep, slices, cb, urgent); } void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices, diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 79c8ece263a..932e7e15b9a 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -36,7 +36,8 @@ typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; class Timestamps; struct grpc_endpoint_vtable { - void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb); + void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, + bool urgent); void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, void* arg); void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset); @@ -56,7 +57,7 @@ struct grpc_endpoint_vtable { Valid slices may be placed into \a slices even when the callback is invoked with error != GRPC_ERROR_NONE. */ void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb); + grpc_closure* cb, bool urgent); char* grpc_endpoint_get_peer(grpc_endpoint* ep); diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc index 25146e7861c..6de22972dbf 100644 --- a/src/core/lib/iomgr/endpoint_cfstream.cc +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -251,7 +251,7 @@ static void CFStreamReadAllocationDone(void* arg, grpc_error* error) { } static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl, diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 7b6ca1bc0e1..3248343e27c 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -60,6 +60,9 @@ #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 +/* Linux has TCP_INQ support since 4.18, but it is safe to set + the socket option on older kernels. */ +#define GRPC_HAVE_TCP_INQ 1 #ifdef LINUX_VERSION_CODE #if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) #define GRPC_LINUX_ERRQUEUE 1 diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index 1e5696e1279..f7ad120b026 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -192,7 +192,7 @@ static void tcp_read_allocation_done(void* tcpp, grpc_error* error) { } static void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep; GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); GPR_ASSERT(tcp->read_cb == nullptr); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 525288a77ae..960a45b7b26 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -34,6 +35,7 @@ #include #include #include +#include #include #include @@ -54,6 +56,15 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#ifndef SOL_TCP +#define SOL_TCP IPPROTO_TCP +#endif + +#ifndef TCP_INQ +#define TCP_INQ 36 +#define TCP_CM_INQ TCP_INQ +#endif + #ifdef GRPC_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL #else @@ -88,8 +99,11 @@ struct grpc_tcp { grpc_slice_buffer last_read_buffer; grpc_slice_buffer* incoming_buffer; + int inq; /* bytes pending on the socket from the last read. */ + bool inq_capable; /* cache whether kernel supports inq */ + grpc_slice_buffer* outgoing_buffer; - /** byte within outgoing_buffer->slices[0] to write next */ + /* byte within outgoing_buffer->slices[0] to write next */ size_t outgoing_byte_idx; grpc_closure* read_cb; @@ -429,69 +443,140 @@ static void tcp_do_read(grpc_tcp* tcp) { GPR_TIMER_SCOPE("tcp_do_read", 0); struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; + char cmsgbuf[24 /*CMSG_SPACE(sizeof(int))*/]; ssize_t read_bytes; - size_t i; - - GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC); + size_t total_read_bytes = 0; - for (i = 0; i < tcp->incoming_buffer->count; i++) { + size_t iov_len = + std::min(MAX_READ_IOVEC, tcp->incoming_buffer->count); + for (size_t i = 0; i < iov_len; i++) { iov[i].iov_base = GRPC_SLICE_START_PTR(tcp->incoming_buffer->slices[i]); iov[i].iov_len = GRPC_SLICE_LENGTH(tcp->incoming_buffer->slices[i]); } - msg.msg_name = nullptr; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = static_cast(tcp->incoming_buffer->count); - msg.msg_control = nullptr; - msg.msg_controllen = 0; - msg.msg_flags = 0; - - GRPC_STATS_INC_TCP_READ_OFFER(tcp->incoming_buffer->length); - GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(tcp->incoming_buffer->count); - do { - GPR_TIMER_SCOPE("recvmsg", 0); - GRPC_STATS_INC_SYSCALL_READ(); - read_bytes = recvmsg(tcp->fd, &msg, 0); - } while (read_bytes < 0 && errno == EINTR); - - if (read_bytes < 0) { - /* NB: After calling call_read_cb a parallel call of the read handler may - * be running. */ - if (errno == EAGAIN) { - finish_estimate(tcp); - /* We've consumed the edge, request a new one */ - notify_on_read(tcp); + /* Assume there is something on the queue. If we receive TCP_INQ from + * kernel, we will update this value, otherwise, we have to assume there is + * always something to read until we get EAGAIN. */ + tcp->inq = 1; + + msg.msg_name = nullptr; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = static_cast(iov_len); + if (tcp->inq_capable) { + msg.msg_control = cmsgbuf; + msg.msg_controllen = sizeof(cmsgbuf); } else { + msg.msg_control = nullptr; + msg.msg_controllen = 0; + } + msg.msg_flags = 0; + + GRPC_STATS_INC_TCP_READ_OFFER(tcp->incoming_buffer->length); + GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(tcp->incoming_buffer->count); + + do { + GPR_TIMER_SCOPE("recvmsg", 0); + GRPC_STATS_INC_SYSCALL_READ(); + read_bytes = recvmsg(tcp->fd, &msg, 0); + } while (read_bytes < 0 && errno == EINTR); + + /* We have read something in previous reads. We need to deliver those + * bytes to the upper layer. */ + if (read_bytes <= 0 && total_read_bytes > 0) { + tcp->inq = 1; + break; + } + + if (read_bytes < 0) { + /* NB: After calling call_read_cb a parallel call of the read handler may + * be running. */ + if (errno == EAGAIN) { + finish_estimate(tcp); + tcp->inq = 0; + /* We've consumed the edge, request a new one */ + notify_on_read(tcp); + } else { + grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); + call_read_cb(tcp, + tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp)); + TCP_UNREF(tcp, "read"); + } + return; + } + if (read_bytes == 0) { + /* 0 read size ==> end of stream + * + * We may have read something, i.e., total_read_bytes > 0, but + * since the connection is closed we will drop the data here, because we + * can't call the callback multiple times. */ grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); - call_read_cb(tcp, - tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp)); + call_read_cb( + tcp, tcp_annotate_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); TCP_UNREF(tcp, "read"); + return; } - } else if (read_bytes == 0) { - /* 0 read size ==> end of stream */ - grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); - call_read_cb( - tcp, tcp_annotate_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); - TCP_UNREF(tcp, "read"); - } else { + GRPC_STATS_INC_TCP_READ_SIZE(read_bytes); add_to_estimate(tcp, static_cast(read_bytes)); - GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); - if (static_cast(read_bytes) == tcp->incoming_buffer->length) { - finish_estimate(tcp); - } else if (static_cast(read_bytes) < tcp->incoming_buffer->length) { - grpc_slice_buffer_trim_end( - tcp->incoming_buffer, - tcp->incoming_buffer->length - static_cast(read_bytes), - &tcp->last_read_buffer); + GPR_DEBUG_ASSERT((size_t)read_bytes <= + tcp->incoming_buffer->length - total_read_bytes); + +#ifdef GRPC_HAVE_TCP_INQ + if (tcp->inq_capable) { + GPR_DEBUG_ASSERT(!(msg.msg_flags & MSG_CTRUNC)); + struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); + for (; cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level == SOL_TCP && cmsg->cmsg_type == TCP_CM_INQ && + cmsg->cmsg_len == CMSG_LEN(sizeof(int))) { + tcp->inq = *reinterpret_cast(CMSG_DATA(cmsg)); + } + } } - GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); - call_read_cb(tcp, GRPC_ERROR_NONE); - TCP_UNREF(tcp, "read"); +#endif /* GRPC_HAVE_TCP_INQ */ + + total_read_bytes += read_bytes; + if (tcp->inq == 0 || total_read_bytes == tcp->incoming_buffer->length) { + /* We have filled incoming_buffer, and we cannot read any more. */ + break; + } + + /* We had a partial read, and still have space to read more data. + * So, adjust IOVs and try to read more. */ + size_t remaining = read_bytes; + size_t j = 0; + for (size_t i = 0; i < iov_len; i++) { + if (remaining >= iov[i].iov_len) { + remaining -= iov[i].iov_len; + continue; + } + if (remaining > 0) { + iov[j].iov_base = static_cast(iov[i].iov_base) + remaining; + iov[j].iov_len = iov[i].iov_len - remaining; + remaining = 0; + } else { + iov[j].iov_base = iov[i].iov_base; + iov[j].iov_len = iov[i].iov_len; + } + ++j; + } + iov_len = j; + } while (true); + + if (tcp->inq == 0) { + finish_estimate(tcp); } + + GPR_DEBUG_ASSERT(total_read_bytes > 0); + if (total_read_bytes < tcp->incoming_buffer->length) { + grpc_slice_buffer_trim_end(tcp->incoming_buffer, + tcp->incoming_buffer->length - total_read_bytes, + &tcp->last_read_buffer); + } + call_read_cb(tcp, GRPC_ERROR_NONE); + TCP_UNREF(tcp, "read"); } static void tcp_read_allocation_done(void* tcpp, grpc_error* error) { @@ -512,7 +597,8 @@ static void tcp_read_allocation_done(void* tcpp, grpc_error* error) { static void tcp_continue_read(grpc_tcp* tcp) { size_t target_read_size = get_target_read_size(tcp); - if (tcp->incoming_buffer->length < target_read_size / 2 && + /* Wait for allocation only when there is no buffer left. */ + if (tcp->incoming_buffer->length == 0 && tcp->incoming_buffer->count < MAX_READ_IOVEC) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_INFO, "TCP:%p alloc_slices", tcp); @@ -544,7 +630,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) { } static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { grpc_tcp* tcp = reinterpret_cast(ep); GPR_ASSERT(tcp->read_cb == nullptr); tcp->read_cb = cb; @@ -557,6 +643,11 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, * the polling engine */ tcp->is_first_read = false; notify_on_read(tcp); + } else if (!urgent && tcp->inq == 0) { + /* Upper layer asked to read more but we know there is no pending data + * to read from previous reads. So, wait for POLLIN. + */ + notify_on_read(tcp); } else { /* Not the first time. We may or may not have more bytes available. In any * case call tcp->read_done_closure (i.e tcp_handle_read()) which does the @@ -1157,6 +1248,19 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->tb_head = nullptr; GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp, grpc_schedule_on_exec_ctx); + /* Always assume there is something on the queue to read. */ + tcp->inq = 1; +#ifdef GRPC_HAVE_TCP_INQ + int one = 1; + if (setsockopt(tcp->fd, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) { + tcp->inq_capable = true; + } else { + gpr_log(GPR_INFO, "cannot set inq fd=%d errno=%d", tcp->fd, errno); + tcp->inq_capable = false; + } +#else + tcp->inq_capable = false; +#endif /* GRPC_HAVE_TCP_INQ */ /* Start being notified on errors if event engine can track errors. */ if (grpc_event_engine_can_track_errors()) { /* Grab a ref to tcp so that we can safely access the tcp struct when diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index 43817c5a024..7b464651ea1 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -241,7 +241,7 @@ static void on_read(void* tcpp, grpc_error* error) { #define DEFAULT_TARGET_READ_SIZE 8192 #define MAX_WSABUF_COUNT 16 static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { grpc_tcp* tcp = (grpc_tcp*)ep; grpc_winsocket* handle = tcp->socket; grpc_winsocket_callback_info* info = &handle->read_info; diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index 14fb55884f1..2a862492bd7 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -255,7 +255,7 @@ static void on_read(void* user_data, grpc_error* error) { } static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { secure_endpoint* ep = reinterpret_cast(secure_ep); ep->read_cb = cb; ep->read_buffer = slices; @@ -269,7 +269,7 @@ static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, return; } - grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read); + grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent); } static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur, diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index 5369574b854..3605bbe5974 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -283,7 +283,7 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked( if (result == TSI_INCOMPLETE_DATA) { GPR_ASSERT(bytes_to_send_size == 0); grpc_endpoint_read(args_->endpoint, args_->read_buffer, - &on_handshake_data_received_from_peer_); + &on_handshake_data_received_from_peer_, /*urgent=*/true); return error; } if (result != TSI_OK) { @@ -306,7 +306,7 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked( } else if (handshaker_result == nullptr) { // There is nothing to send, but need to read from peer. grpc_endpoint_read(args_->endpoint, args_->read_buffer, - &on_handshake_data_received_from_peer_); + &on_handshake_data_received_from_peer_, /*urgent=*/true); } else { // Handshake has finished, check peer and so on. error = CheckPeerLocked(); @@ -382,7 +382,8 @@ void SecurityHandshaker::OnHandshakeDataSentToPeerFn(void* arg, // We may be done. if (h->handshaker_result_ == nullptr) { grpc_endpoint_read(h->args_->endpoint, h->args_->read_buffer, - &h->on_handshake_data_received_from_peer_); + &h->on_handshake_data_received_from_peer_, + /*urgent=*/true); } else { error = h->CheckPeerLocked(); if (error != GRPC_ERROR_NONE) { diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc index ae1e42a4e0d..6b492523219 100644 --- a/test/core/bad_client/bad_client.cc +++ b/test/core/bad_client/bad_client.cc @@ -143,7 +143,8 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags, grpc_closure read_done_closure; GRPC_CLOSURE_INIT(&read_done_closure, set_read_done, &read_done_event, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(sfd->client, &incoming, &read_done_closure); + grpc_endpoint_read(sfd->client, &incoming, &read_done_closure, + /*urgent=*/true); grpc_core::ExecCtx::Get()->Flush(); do { GPR_ASSERT(gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0); diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc index 99cfec7adf6..3701a938a3d 100644 --- a/test/core/end2end/bad_server_response_test.cc +++ b/test/core/end2end/bad_server_response_test.cc @@ -126,7 +126,8 @@ static void handle_read(void* arg, grpc_error* error) { SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD) { handle_write(); } else { - grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read, + /*urgent=*/false); } } @@ -142,7 +143,8 @@ static void on_connect(void* arg, grpc_endpoint* tcp, state.tcp = tcp; state.incoming_data_length = 0; grpc_endpoint_add_to_pollset(tcp, server->pollset); - grpc_endpoint_read(tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(tcp, &state.temp_incoming_buffer, &on_read, + /*urgent=*/false); } static gpr_timespec n_sec_deadline(int seconds) { diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index e6fc5dfcfca..6b5513f160e 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -271,7 +271,7 @@ static void on_client_read_done(void* arg, grpc_error* error) { } // Read more data. grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done); + &conn->on_client_read_done, /*urgent=*/false); } // Callback for reading data from the backend server, which will be @@ -302,7 +302,7 @@ static void on_server_read_done(void* arg, grpc_error* error) { } // Read more data. grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done); + &conn->on_server_read_done, /*urgent=*/false); } // Callback to write the HTTP response for the CONNECT request. @@ -323,9 +323,9 @@ static void on_write_response_done(void* arg, grpc_error* error) { proxy_connection_ref(conn, "server_read"); proxy_connection_unref(conn, "write_response"); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done); + &conn->on_client_read_done, /*urgent=*/false); grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done); + &conn->on_server_read_done, /*urgent=*/false); } // Callback to connect to the backend server specified by the HTTP @@ -405,7 +405,7 @@ static void on_read_request_done(void* arg, grpc_error* error) { // If we're not done reading the request, read more data. if (conn->http_parser.state != GRPC_HTTP_BODY) { grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done); + &conn->on_read_request_done, /*urgent=*/false); return; } // Make sure we got a CONNECT request. @@ -503,7 +503,7 @@ static void on_accept(void* arg, grpc_endpoint* endpoint, grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, &conn->http_request); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done); + &conn->on_read_request_done, /*urgent=*/false); } // diff --git a/test/core/handshake/readahead_handshaker_server_ssl.cc b/test/core/handshake/readahead_handshaker_server_ssl.cc index d91f2d2fe63..c0ab61136cb 100644 --- a/test/core/handshake/readahead_handshaker_server_ssl.cc +++ b/test/core/handshake/readahead_handshaker_server_ssl.cc @@ -59,7 +59,8 @@ class ReadAheadHandshaker : public Handshaker { void DoHandshake(grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, HandshakerArgs* args) override { - grpc_endpoint_read(args->endpoint, args->read_buffer, on_handshake_done); + grpc_endpoint_read(args->endpoint, args->read_buffer, on_handshake_done, + /*urgent=*/false); } }; diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc index a9e8ba86c5d..beae24769f6 100644 --- a/test/core/iomgr/endpoint_tests.cc +++ b/test/core/iomgr/endpoint_tests.cc @@ -129,7 +129,8 @@ static void read_and_write_test_read_handler(void* data, grpc_error* error) { GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)); gpr_mu_unlock(g_mu); } else if (error == GRPC_ERROR_NONE) { - grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read); + grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read, + /*urgent=*/false); } } @@ -216,8 +217,8 @@ static void read_and_write_test(grpc_endpoint_test_config config, read_and_write_test_write_handler(&state, GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); - grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read); - + grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read, + /*urgent=*/false); if (shutdown) { gpr_log(GPR_DEBUG, "shutdown read"); grpc_endpoint_shutdown( @@ -282,14 +283,16 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_endpoint_add_to_pollset(f.client_ep, g_pollset); grpc_endpoint_read(f.client_ep, &slice_buffer, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, - grpc_schedule_on_exec_ctx)); + grpc_schedule_on_exec_ctx), + /*urgent=*/false); wait_for_fail_count(&fail_count, 0); grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); wait_for_fail_count(&fail_count, 1); grpc_endpoint_read(f.client_ep, &slice_buffer, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, - grpc_schedule_on_exec_ctx)); + grpc_schedule_on_exec_ctx), + /*urgent=*/false); wait_for_fail_count(&fail_count, 2); grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a")); grpc_endpoint_write(f.client_ep, &slice_buffer, diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc index 5b601b1ae5f..33a4d973ed3 100644 --- a/test/core/iomgr/tcp_posix_test.cc +++ b/test/core/iomgr/tcp_posix_test.cc @@ -191,7 +191,8 @@ static void read_cb(void* user_data, grpc_error* error) { GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr))); gpr_mu_unlock(g_mu); } else { - grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb); + grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb, + /*urgent=*/false); gpr_mu_unlock(g_mu); } } @@ -229,7 +230,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { @@ -280,7 +281,7 @@ static void large_read_test(size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { @@ -519,7 +520,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { diff --git a/test/core/security/secure_endpoint_test.cc b/test/core/security/secure_endpoint_test.cc index f6d02895b5f..3a2d599767a 100644 --- a/test/core/security/secure_endpoint_test.cc +++ b/test/core/security/secure_endpoint_test.cc @@ -182,7 +182,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { grpc_slice_buffer_init(&incoming); GRPC_CLOSURE_INIT(&done_closure, inc_call_ctr, &n, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(f.client_ep, &incoming, &done_closure); + grpc_endpoint_read(f.client_ep, &incoming, &done_closure, /*urgent=*/false); grpc_core::ExecCtx::Get()->Flush(); GPR_ASSERT(n == 1); diff --git a/test/core/transport/chttp2/settings_timeout_test.cc b/test/core/transport/chttp2/settings_timeout_test.cc index a9789edbf2b..32a268ed521 100644 --- a/test/core/transport/chttp2/settings_timeout_test.cc +++ b/test/core/transport/chttp2/settings_timeout_test.cc @@ -133,7 +133,8 @@ class Client { grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 3000; while (true) { EventState state; - grpc_endpoint_read(endpoint_, &read_buffer, state.closure()); + grpc_endpoint_read(endpoint_, &read_buffer, state.closure(), + /*urgent=*/true); if (!PollUntilDone(&state, deadline)) { retval = false; break; diff --git a/test/core/util/mock_endpoint.cc b/test/core/util/mock_endpoint.cc index df2ee7aedfd..2f78a7f8a97 100644 --- a/test/core/util/mock_endpoint.cc +++ b/test/core/util/mock_endpoint.cc @@ -41,7 +41,7 @@ typedef struct mock_endpoint { } mock_endpoint; static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { mock_endpoint* m = reinterpret_cast(ep); gpr_mu_lock(&m->mu); if (m->read_buffer.count > 0) { diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index 51b6de46951..2d26902fc44 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -54,7 +54,7 @@ struct passthru_endpoint { }; static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { half* m = reinterpret_cast(ep); gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc index b0da735e57f..bdac1334f48 100644 --- a/test/core/util/trickle_endpoint.cc +++ b/test/core/util/trickle_endpoint.cc @@ -47,9 +47,9 @@ typedef struct { } trickle_endpoint; static void te_read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { trickle_endpoint* te = reinterpret_cast(ep); - grpc_endpoint_read(te->wrapped, slices, cb); + grpc_endpoint_read(te->wrapped, slices, cb, urgent); } static void maybe_call_write_cb_locked(trickle_endpoint* te) { diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index dcfaa684773..baa6da3fbcf 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -92,7 +92,7 @@ class DummyEndpoint : public grpc_endpoint { } static void read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { static_cast(ep)->QueueRead(slices, cb); }