From 0e3a02e9039474c4254aadb08410992e00bd6291 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Mon, 1 Mar 2021 22:04:22 -0800 Subject: [PATCH] Revert "Remove the `urgent` argument from iomgr tcp read API (#25494)" (#25592) This reverts commit a3398f9. Justification: see b/181367644. tl;dr: assuming urgent==false does not hold in all situations. --- .../client_channel/http_connect_handshaker.cc | 6 ++++-- .../transport/chttp2/transport/chttp2_transport.cc | 3 ++- src/core/lib/http/httpcli.cc | 2 +- src/core/lib/iomgr/endpoint.cc | 4 ++-- src/core/lib/iomgr/endpoint.h | 5 +++-- src/core/lib/iomgr/endpoint_cfstream.cc | 2 +- src/core/lib/iomgr/tcp_custom.cc | 2 +- src/core/lib/iomgr/tcp_posix.cc | 4 ++-- src/core/lib/iomgr/tcp_windows.cc | 2 +- src/core/lib/security/transport/secure_endpoint.cc | 4 ++-- .../lib/security/transport/security_handshaker.cc | 9 ++++++--- test/core/bad_client/bad_client.cc | 3 ++- test/core/end2end/bad_server_response_test.cc | 9 ++++++--- test/core/end2end/fixtures/http_proxy_fixture.cc | 12 ++++++------ .../handshake/readahead_handshaker_server_ssl.cc | 3 ++- test/core/iomgr/endpoint_tests.cc | 12 ++++++++---- .../iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm | 8 ++++---- test/core/iomgr/tcp_posix_test.cc | 9 +++++---- test/core/security/secure_endpoint_test.cc | 2 +- test/core/transport/chttp2/settings_timeout_test.cc | 3 ++- test/core/util/eval_args_mock_endpoint.cc | 2 +- test/core/util/mock_endpoint.cc | 2 +- test/core/util/passthru_endpoint.cc | 2 +- test/core/util/trickle_endpoint.cc | 4 ++-- test/cpp/microbenchmarks/bm_chttp2_transport.cc | 2 +- 25 files changed, 67 insertions(+), 49 deletions(-) diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index c1c8f8cddbf..8880806440d 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -160,7 +160,8 @@ void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error* error) { handshaker->args_->endpoint, handshaker->args_->read_buffer, GRPC_CLOSURE_INIT(&handshaker->response_read_closure_, &HttpConnectHandshaker::OnReadDoneScheduler, - handshaker, grpc_schedule_on_exec_ctx)); + handshaker, grpc_schedule_on_exec_ctx), + /*urgent=*/true); } } @@ -235,7 +236,8 @@ void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error* error) { handshaker->args_->endpoint, handshaker->args_->read_buffer, GRPC_CLOSURE_INIT(&handshaker->response_read_closure_, &HttpConnectHandshaker::OnReadDoneScheduler, - handshaker, grpc_schedule_on_exec_ctx)); + handshaker, grpc_schedule_on_exec_ctx), + /*urgent=*/true); return; } // Make sure we got a 2xx response. diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 7aec6c21303..35c0512cada 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -2552,9 +2552,10 @@ static void read_action_locked(void* tp, grpc_error* error) { } static void continue_read_action_locked(grpc_chttp2_transport* t) { + const bool urgent = t->goaway_error != GRPC_ERROR_NONE; GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked); + grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent); grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr); } diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc index 9c15dc63e79..8d024dd3ac5 100644 --- a/src/core/lib/http/httpcli.cc +++ b/src/core/lib/http/httpcli.cc @@ -124,7 +124,7 @@ static void append_error(internal_request* req, grpc_error* error) { } static void do_read(internal_request* req) { - grpc_endpoint_read(req->ep, &req->incoming, &req->on_read); + grpc_endpoint_read(req->ep, &req->incoming, &req->on_read, /*urgent=*/true); } static void on_read(void* user_data, grpc_error* error) { diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc index 81a83d0a776..8954f9dc066 100644 --- a/src/core/lib/iomgr/endpoint.cc +++ b/src/core/lib/iomgr/endpoint.cc @@ -23,8 +23,8 @@ grpc_core::TraceFlag grpc_tcp_trace(false, "tcp"); void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { - ep->vtable->read(ep, slices, cb); + grpc_closure* cb, bool urgent) { + ep->vtable->read(ep, slices, cb, urgent); } void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices, diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 06dc6ed4cc2..b6e6086f14f 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -37,7 +37,8 @@ typedef struct grpc_endpoint grpc_endpoint; typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; struct grpc_endpoint_vtable { - void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb); + void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, + bool urgent); void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, void* arg); void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset); @@ -58,7 +59,7 @@ struct grpc_endpoint_vtable { Valid slices may be placed into \a slices even when the callback is invoked with error != GRPC_ERROR_NONE. */ void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb); + grpc_closure* cb, bool urgent); absl::string_view grpc_endpoint_get_peer(grpc_endpoint* ep); diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc index 62358539ea0..75cd0f09d1a 100644 --- a/src/core/lib/iomgr/endpoint_cfstream.cc +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -254,7 +254,7 @@ static void CFStreamReadAllocationDone(void* arg, grpc_error* error) { } static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl, diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index dddbefc2dc3..0f45c75c0d5 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -197,7 +197,7 @@ static void tcp_read_allocation_done(void* tcpp, grpc_error* error) { } static void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, - grpc_closure* cb) { + grpc_closure* cb, bool /*urgent*/) { custom_tcp_endpoint* tcp = reinterpret_cast(ep); GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); GPR_ASSERT(tcp->read_cb == nullptr); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index c5c7e596179..6e1a29372cb 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -904,7 +904,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) { } static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { grpc_tcp* tcp = reinterpret_cast(ep); GPR_ASSERT(tcp->read_cb == nullptr); tcp->read_cb = cb; @@ -917,7 +917,7 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, * the polling engine */ tcp->is_first_read = false; notify_on_read(tcp); - } else if (tcp->inq == 0) { + } else if (!urgent && tcp->inq == 0) { /* Upper layer asked to read more but we know there is no pending data * to read from previous reads. So, wait for POLLIN. */ diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index d40e9e99423..d28a54ccf43 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -239,7 +239,7 @@ static void on_read(void* tcpp, grpc_error* error) { #define DEFAULT_TARGET_READ_SIZE 8192 #define MAX_WSABUF_COUNT 16 static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { grpc_tcp* tcp = (grpc_tcp*)ep; grpc_winsocket* handle = tcp->socket; grpc_winsocket_callback_info* info = &handle->read_info; diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index e8936c0da9d..7ab4d7b779c 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -255,7 +255,7 @@ static void on_read(void* user_data, grpc_error* error) { } static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { secure_endpoint* ep = reinterpret_cast(secure_ep); ep->read_cb = cb; ep->read_buffer = slices; @@ -269,7 +269,7 @@ static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, return; } - grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read); + grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent); } static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur, diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index e4f9759683f..9e0e4032aa5 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -296,7 +296,8 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked( GRPC_CLOSURE_INIT( &on_handshake_data_received_from_peer_, &SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler, - this, grpc_schedule_on_exec_ctx)); + this, grpc_schedule_on_exec_ctx), + /*urgent=*/true); return error; } if (result != TSI_OK) { @@ -328,7 +329,8 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked( GRPC_CLOSURE_INIT( &on_handshake_data_received_from_peer_, &SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler, - this, grpc_schedule_on_exec_ctx)); + this, grpc_schedule_on_exec_ctx), + /*urgent=*/true); } else { // Handshake has finished, check peer and so on. error = CheckPeerLocked(); @@ -434,7 +436,8 @@ void SecurityHandshaker::OnHandshakeDataSentToPeerFn(void* arg, GRPC_CLOSURE_INIT( &h->on_handshake_data_received_from_peer_, &SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler, - h.get(), grpc_schedule_on_exec_ctx)); + h.get(), grpc_schedule_on_exec_ctx), + /*urgent=*/true); } else { error = h->CheckPeerLocked(); if (error != GRPC_ERROR_NONE) { diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc index 05117cced08..000215edf2d 100644 --- a/test/core/bad_client/bad_client.cc +++ b/test/core/bad_client/bad_client.cc @@ -143,7 +143,8 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags, grpc_closure read_done_closure; GRPC_CLOSURE_INIT(&read_done_closure, set_read_done, &read_done_event, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(sfd->client, &incoming, &read_done_closure); + grpc_endpoint_read(sfd->client, &incoming, &read_done_closure, + /*urgent=*/true); grpc_core::ExecCtx::Get()->Flush(); do { GPR_ASSERT(gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0); diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc index 8339faf90d4..cb5427fe837 100644 --- a/test/core/end2end/bad_server_response_test.cc +++ b/test/core/end2end/bad_server_response_test.cc @@ -102,7 +102,8 @@ static void done_write(void* /*arg*/, grpc_error* error) { static void done_writing_settings_frame(void* /* arg */, grpc_error* error) { GPR_ASSERT(error == GRPC_ERROR_NONE); - grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read, + /*urgent=*/false); } static void handle_write() { @@ -139,7 +140,8 @@ static void handle_read(void* /*arg*/, grpc_error* error) { !state.http2_response) { handle_write(); } else { - grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read, + /*urgent=*/false); } } @@ -166,7 +168,8 @@ static void on_connect(void* arg, grpc_endpoint* tcp, grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_writing_settings_frame, nullptr); } else { - grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read, + /*urgent=*/false); } } diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index a5368a90774..a5c3353ea2f 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -308,7 +308,7 @@ static void on_client_read_done_locked(void* arg, grpc_error* error) { GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn, grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done); + &conn->on_client_read_done, /*urgent=*/false); } static void on_client_read_done(void* arg, grpc_error* error) { @@ -350,7 +350,7 @@ static void on_server_read_done_locked(void* arg, grpc_error* error) { GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn, grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done); + &conn->on_server_read_done, /*urgent=*/false); } static void on_server_read_done(void* arg, grpc_error* error) { @@ -380,11 +380,11 @@ static void on_write_response_done_locked(void* arg, grpc_error* error) { GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn, grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done); + &conn->on_client_read_done, /*urgent=*/false); GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn, grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done); + &conn->on_server_read_done, /*urgent=*/false); } static void on_write_response_done(void* arg, grpc_error* error) { @@ -484,7 +484,7 @@ static void on_read_request_done_locked(void* arg, grpc_error* error) { GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn, grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done); + &conn->on_read_request_done, /*urgent=*/false); return; } // Make sure we got a CONNECT request. @@ -579,7 +579,7 @@ static void on_accept(void* arg, grpc_endpoint* endpoint, GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn, grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done); + &conn->on_read_request_done, /*urgent=*/false); } // diff --git a/test/core/handshake/readahead_handshaker_server_ssl.cc b/test/core/handshake/readahead_handshaker_server_ssl.cc index 15319deb80d..708b316e793 100644 --- a/test/core/handshake/readahead_handshaker_server_ssl.cc +++ b/test/core/handshake/readahead_handshaker_server_ssl.cc @@ -59,7 +59,8 @@ class ReadAheadHandshaker : public Handshaker { void DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/, grpc_closure* on_handshake_done, HandshakerArgs* args) override { - grpc_endpoint_read(args->endpoint, args->read_buffer, on_handshake_done); + grpc_endpoint_read(args->endpoint, args->read_buffer, on_handshake_done, + /*urgent=*/false); } }; diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc index 93c51c97989..1a16e843730 100644 --- a/test/core/iomgr/endpoint_tests.cc +++ b/test/core/iomgr/endpoint_tests.cc @@ -121,7 +121,8 @@ struct read_and_write_test_state { static void read_scheduler(void* data, grpc_error* /* error */) { struct read_and_write_test_state* state = static_cast(data); - grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read); + grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read, + /*urgent=*/false); } static void read_and_write_test_read_handler(void* data, grpc_error* error) { @@ -242,7 +243,8 @@ static void read_and_write_test(grpc_endpoint_test_config config, read_and_write_test_write_handler(&state, GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); - grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read); + grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read, + /*urgent=*/false); if (shutdown) { gpr_log(GPR_DEBUG, "shutdown read"); grpc_endpoint_shutdown( @@ -307,14 +309,16 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_endpoint_add_to_pollset(f.client_ep, g_pollset); grpc_endpoint_read(f.client_ep, &slice_buffer, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, - grpc_schedule_on_exec_ctx)); + grpc_schedule_on_exec_ctx), + /*urgent=*/false); wait_for_fail_count(&fail_count, 0); grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); wait_for_fail_count(&fail_count, 1); grpc_endpoint_read(f.client_ep, &slice_buffer, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, - grpc_schedule_on_exec_ctx)); + grpc_schedule_on_exec_ctx), + /*urgent=*/false); wait_for_fail_count(&fail_count, 2); grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a")); grpc_endpoint_write(f.client_ep, &slice_buffer, diff --git a/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm b/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm index 78e6f962633..5cd0d693102 100644 --- a/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm +++ b/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm @@ -187,7 +187,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch grpc_slice_buffer_init(&read_one_slice); while (read_slices.length < kBufferSize) { init_event_closure(&read_done, &read); - grpc_endpoint_read(ep_, &read_one_slice, &read_done); + grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false); XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES); XCTAssertEqual(reinterpret_cast(read), GRPC_ERROR_NONE); grpc_slice_buffer_move_into(&read_one_slice, &read_slices); @@ -218,7 +218,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch grpc_slice_buffer_init(&read_slices); init_event_closure(&read_done, &read); - grpc_endpoint_read(ep_, &read_slices, &read_done); + grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false); grpc_slice_buffer_init(&write_slices); slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); @@ -267,7 +267,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch init_event_closure(&read_done, &read); grpc_slice_buffer_init(&read_slices); - grpc_endpoint_read(ep_, &read_slices, &read_done); + grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false); grpc_slice_buffer_init(&write_slices); slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); @@ -306,7 +306,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch init_event_closure(&read_done, &read); grpc_slice_buffer_init(&read_slices); - grpc_endpoint_read(ep_, &read_slices, &read_done); + grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false); struct linger so_linger; so_linger.l_onoff = 1; diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc index fefa4141b9b..3d995eedc11 100644 --- a/test/core/iomgr/tcp_posix_test.cc +++ b/test/core/iomgr/tcp_posix_test.cc @@ -193,7 +193,8 @@ static void read_cb(void* user_data, grpc_error* error) { gpr_mu_unlock(g_mu); } else { gpr_mu_unlock(g_mu); - grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb); + grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb, + /*urgent=*/false); } } @@ -230,7 +231,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { @@ -281,7 +282,7 @@ static void large_read_test(size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { @@ -520,7 +521,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { diff --git a/test/core/security/secure_endpoint_test.cc b/test/core/security/secure_endpoint_test.cc index 19d16b53255..50c3694d772 100644 --- a/test/core/security/secure_endpoint_test.cc +++ b/test/core/security/secure_endpoint_test.cc @@ -182,7 +182,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { grpc_slice_buffer_init(&incoming); GRPC_CLOSURE_INIT(&done_closure, inc_call_ctr, &n, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(f.client_ep, &incoming, &done_closure); + grpc_endpoint_read(f.client_ep, &incoming, &done_closure, /*urgent=*/false); grpc_core::ExecCtx::Get()->Flush(); GPR_ASSERT(n == 1); diff --git a/test/core/transport/chttp2/settings_timeout_test.cc b/test/core/transport/chttp2/settings_timeout_test.cc index 65d6b7d71d8..81642315dec 100644 --- a/test/core/transport/chttp2/settings_timeout_test.cc +++ b/test/core/transport/chttp2/settings_timeout_test.cc @@ -137,7 +137,8 @@ class Client { grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 3000; while (true) { EventState state; - grpc_endpoint_read(endpoint_, &read_buffer, state.closure()); + grpc_endpoint_read(endpoint_, &read_buffer, state.closure(), + /*urgent=*/true); if (!PollUntilDone(&state, deadline)) { retval = false; break; diff --git a/test/core/util/eval_args_mock_endpoint.cc b/test/core/util/eval_args_mock_endpoint.cc index e82db70f385..6f0de72dd3d 100644 --- a/test/core/util/eval_args_mock_endpoint.cc +++ b/test/core/util/eval_args_mock_endpoint.cc @@ -37,7 +37,7 @@ class EvalArgsMockEndpoint { } grpc_endpoint* base() const { return const_cast(&base_); } static void Read(grpc_endpoint* /*ep*/, grpc_slice_buffer* /*slices*/, - grpc_closure* /*cb*/) {} + grpc_closure* /*cb*/, bool /*unused*/) {} static void Write(grpc_endpoint* /*ep*/, grpc_slice_buffer* /*slices*/, grpc_closure* /*cb*/, void* /*unused*/) {} static void AddToPollset(grpc_endpoint* /*ep*/, grpc_pollset* /*unused*/) {} diff --git a/test/core/util/mock_endpoint.cc b/test/core/util/mock_endpoint.cc index a5afef32645..b62613b8d34 100644 --- a/test/core/util/mock_endpoint.cc +++ b/test/core/util/mock_endpoint.cc @@ -45,7 +45,7 @@ typedef struct mock_endpoint { } mock_endpoint; static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool /*urgent*/) { mock_endpoint* m = reinterpret_cast(ep); gpr_mu_lock(&m->mu); if (m->read_buffer.count > 0) { diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index d64b9cc0b0c..f856481bc65 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -58,7 +58,7 @@ struct passthru_endpoint { }; static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool /*urgent*/) { half* m = reinterpret_cast(ep); gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc index b5bb08441fc..603b51ab80c 100644 --- a/test/core/util/trickle_endpoint.cc +++ b/test/core/util/trickle_endpoint.cc @@ -47,9 +47,9 @@ typedef struct { } trickle_endpoint; static void te_read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool urgent) { trickle_endpoint* te = reinterpret_cast(ep); - grpc_endpoint_read(te->wrapped, slices, cb); + grpc_endpoint_read(te->wrapped, slices, cb, urgent); } static void maybe_call_write_cb_locked(trickle_endpoint* te) { diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 3cd311f2bee..45144017d5b 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -93,7 +93,7 @@ class PhonyEndpoint : public grpc_endpoint { } static void read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, bool /*urgent*/) { static_cast(ep)->QueueRead(slices, cb); }