Remove the `urgent` argument from iomgr tcp read API (#25494)

The urgent argument is a platform-specific flag that leaked into the (ideally) platform-independent HTTP/2 transport layer. In an effort to clean up the cross-platform API surface, it would be helpful if we can remove this argument from the TCP Read api without losing the performance optimization that was introduced along with it (see #18240).
pull/25549/head
AJ Heller 4 years ago committed by GitHub
parent a0fc3a8dda
commit a3398f924c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  2. 3
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  3. 2
      src/core/lib/http/httpcli.cc
  4. 4
      src/core/lib/iomgr/endpoint.cc
  5. 5
      src/core/lib/iomgr/endpoint.h
  6. 2
      src/core/lib/iomgr/endpoint_cfstream.cc
  7. 2
      src/core/lib/iomgr/tcp_custom.cc
  8. 4
      src/core/lib/iomgr/tcp_posix.cc
  9. 2
      src/core/lib/iomgr/tcp_windows.cc
  10. 4
      src/core/lib/security/transport/secure_endpoint.cc
  11. 9
      src/core/lib/security/transport/security_handshaker.cc
  12. 3
      test/core/bad_client/bad_client.cc
  13. 9
      test/core/end2end/bad_server_response_test.cc
  14. 12
      test/core/end2end/fixtures/http_proxy_fixture.cc
  15. 3
      test/core/handshake/readahead_handshaker_server_ssl.cc
  16. 12
      test/core/iomgr/endpoint_tests.cc
  17. 8
      test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm
  18. 9
      test/core/iomgr/tcp_posix_test.cc
  19. 2
      test/core/security/secure_endpoint_test.cc
  20. 3
      test/core/transport/chttp2/settings_timeout_test.cc
  21. 2
      test/core/util/eval_args_mock_endpoint.cc
  22. 2
      test/core/util/mock_endpoint.cc
  23. 2
      test/core/util/passthru_endpoint.cc
  24. 4
      test/core/util/trickle_endpoint.cc
  25. 2
      test/cpp/microbenchmarks/bm_chttp2_transport.cc

@ -160,8 +160,7 @@ void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error* error) {
handshaker->args_->endpoint, handshaker->args_->read_buffer, handshaker->args_->endpoint, handshaker->args_->read_buffer,
GRPC_CLOSURE_INIT(&handshaker->response_read_closure_, GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
&HttpConnectHandshaker::OnReadDoneScheduler, &HttpConnectHandshaker::OnReadDoneScheduler,
handshaker, grpc_schedule_on_exec_ctx), handshaker, grpc_schedule_on_exec_ctx));
/*urgent=*/true);
} }
} }
@ -236,8 +235,7 @@ void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error* error) {
handshaker->args_->endpoint, handshaker->args_->read_buffer, handshaker->args_->endpoint, handshaker->args_->read_buffer,
GRPC_CLOSURE_INIT(&handshaker->response_read_closure_, GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
&HttpConnectHandshaker::OnReadDoneScheduler, &HttpConnectHandshaker::OnReadDoneScheduler,
handshaker, grpc_schedule_on_exec_ctx), handshaker, grpc_schedule_on_exec_ctx));
/*urgent=*/true);
return; return;
} }
// Make sure we got a 2xx response. // Make sure we got a 2xx response.

@ -2552,10 +2552,9 @@ static void read_action_locked(void* tp, grpc_error* error) {
} }
static void continue_read_action_locked(grpc_chttp2_transport* t) { static void continue_read_action_locked(grpc_chttp2_transport* t) {
const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t, GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent); grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked);
grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr); grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
} }

@ -124,7 +124,7 @@ static void append_error(internal_request* req, grpc_error* error) {
} }
static void do_read(internal_request* req) { static void do_read(internal_request* req) {
grpc_endpoint_read(req->ep, &req->incoming, &req->on_read, /*urgent=*/true); grpc_endpoint_read(req->ep, &req->incoming, &req->on_read);
} }
static void on_read(void* user_data, grpc_error* error) { static void on_read(void* user_data, grpc_error* error) {

@ -23,8 +23,8 @@
grpc_core::TraceFlag grpc_tcp_trace(false, "tcp"); grpc_core::TraceFlag grpc_tcp_trace(false, "tcp");
void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices, void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool urgent) { grpc_closure* cb) {
ep->vtable->read(ep, slices, cb, urgent); ep->vtable->read(ep, slices, cb);
} }
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices, void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,

@ -37,8 +37,7 @@ typedef struct grpc_endpoint grpc_endpoint;
typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
struct grpc_endpoint_vtable { struct grpc_endpoint_vtable {
void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
bool urgent);
void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
void* arg); void* arg);
void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset); void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
@ -59,7 +58,7 @@ struct grpc_endpoint_vtable {
Valid slices may be placed into \a slices even when the callback is Valid slices may be placed into \a slices even when the callback is
invoked with error != GRPC_ERROR_NONE. */ invoked with error != GRPC_ERROR_NONE. */
void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices, void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool urgent); grpc_closure* cb);
absl::string_view grpc_endpoint_get_peer(grpc_endpoint* ep); absl::string_view grpc_endpoint_get_peer(grpc_endpoint* ep);

@ -254,7 +254,7 @@ static void CFStreamReadAllocationDone(void* arg, grpc_error* error) {
} }
static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool urgent) { grpc_closure* cb) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (grpc_tcp_trace.enabled()) { if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl, gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,

@ -197,7 +197,7 @@ static void tcp_read_allocation_done(void* tcpp, grpc_error* error) {
} }
static void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, static void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
grpc_closure* cb, bool /*urgent*/) { grpc_closure* cb) {
custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep); custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep);
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
GPR_ASSERT(tcp->read_cb == nullptr); GPR_ASSERT(tcp->read_cb == nullptr);

@ -904,7 +904,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) {
} }
static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
grpc_closure* cb, bool urgent) { grpc_closure* cb) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
GPR_ASSERT(tcp->read_cb == nullptr); GPR_ASSERT(tcp->read_cb == nullptr);
tcp->read_cb = cb; tcp->read_cb = cb;
@ -917,7 +917,7 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
* the polling engine */ * the polling engine */
tcp->is_first_read = false; tcp->is_first_read = false;
notify_on_read(tcp); notify_on_read(tcp);
} else if (!urgent && tcp->inq == 0) { } else if (tcp->inq == 0) {
/* Upper layer asked to read more but we know there is no pending data /* Upper layer asked to read more but we know there is no pending data
* to read from previous reads. So, wait for POLLIN. * to read from previous reads. So, wait for POLLIN.
*/ */

@ -239,7 +239,7 @@ static void on_read(void* tcpp, grpc_error* error) {
#define DEFAULT_TARGET_READ_SIZE 8192 #define DEFAULT_TARGET_READ_SIZE 8192
#define MAX_WSABUF_COUNT 16 #define MAX_WSABUF_COUNT 16
static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
grpc_closure* cb, bool urgent) { grpc_closure* cb) {
grpc_tcp* tcp = (grpc_tcp*)ep; grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_winsocket* handle = tcp->socket; grpc_winsocket* handle = tcp->socket;
grpc_winsocket_callback_info* info = &handle->read_info; grpc_winsocket_callback_info* info = &handle->read_info;

@ -255,7 +255,7 @@ static void on_read(void* user_data, grpc_error* error) {
} }
static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool urgent) { grpc_closure* cb) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep); secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
ep->read_cb = cb; ep->read_cb = cb;
ep->read_buffer = slices; ep->read_buffer = slices;
@ -269,7 +269,7 @@ static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
return; return;
} }
grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent); grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read);
} }
static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur, static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,

@ -298,8 +298,7 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked(
GRPC_CLOSURE_INIT( GRPC_CLOSURE_INIT(
&on_handshake_data_received_from_peer_, &on_handshake_data_received_from_peer_,
&SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler, &SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler,
this, grpc_schedule_on_exec_ctx), this, grpc_schedule_on_exec_ctx));
/*urgent=*/true);
return error; return error;
} }
if (result != TSI_OK) { if (result != TSI_OK) {
@ -331,8 +330,7 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked(
GRPC_CLOSURE_INIT( GRPC_CLOSURE_INIT(
&on_handshake_data_received_from_peer_, &on_handshake_data_received_from_peer_,
&SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler, &SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler,
this, grpc_schedule_on_exec_ctx), this, grpc_schedule_on_exec_ctx));
/*urgent=*/true);
} else { } else {
// Handshake has finished, check peer and so on. // Handshake has finished, check peer and so on.
error = CheckPeerLocked(); error = CheckPeerLocked();
@ -438,8 +436,7 @@ void SecurityHandshaker::OnHandshakeDataSentToPeerFn(void* arg,
GRPC_CLOSURE_INIT( GRPC_CLOSURE_INIT(
&h->on_handshake_data_received_from_peer_, &h->on_handshake_data_received_from_peer_,
&SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler, &SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler,
h.get(), grpc_schedule_on_exec_ctx), h.get(), grpc_schedule_on_exec_ctx));
/*urgent=*/true);
} else { } else {
error = h->CheckPeerLocked(); error = h->CheckPeerLocked();
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {

@ -143,8 +143,7 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags,
grpc_closure read_done_closure; grpc_closure read_done_closure;
GRPC_CLOSURE_INIT(&read_done_closure, set_read_done, &read_done_event, GRPC_CLOSURE_INIT(&read_done_closure, set_read_done, &read_done_event,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_endpoint_read(sfd->client, &incoming, &read_done_closure, grpc_endpoint_read(sfd->client, &incoming, &read_done_closure);
/*urgent=*/true);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
do { do {
GPR_ASSERT(gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0); GPR_ASSERT(gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0);

@ -102,8 +102,7 @@ static void done_write(void* /*arg*/, grpc_error* error) {
static void done_writing_settings_frame(void* /* arg */, grpc_error* error) { static void done_writing_settings_frame(void* /* arg */, grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read, grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read);
/*urgent=*/false);
} }
static void handle_write() { static void handle_write() {
@ -140,8 +139,7 @@ static void handle_read(void* /*arg*/, grpc_error* error) {
!state.http2_response) { !state.http2_response) {
handle_write(); handle_write();
} else { } else {
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read, grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read);
/*urgent=*/false);
} }
} }
@ -168,8 +166,7 @@ static void on_connect(void* arg, grpc_endpoint* tcp,
grpc_endpoint_write(state.tcp, &state.outgoing_buffer, grpc_endpoint_write(state.tcp, &state.outgoing_buffer,
&on_writing_settings_frame, nullptr); &on_writing_settings_frame, nullptr);
} else { } else {
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read, grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read);
/*urgent=*/false);
} }
} }

@ -308,7 +308,7 @@ static void on_client_read_done_locked(void* arg, grpc_error* error) {
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn, GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_client_read_done, /*urgent=*/false); &conn->on_client_read_done);
} }
static void on_client_read_done(void* arg, grpc_error* error) { static void on_client_read_done(void* arg, grpc_error* error) {
@ -350,7 +350,7 @@ static void on_server_read_done_locked(void* arg, grpc_error* error) {
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn, GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
&conn->on_server_read_done, /*urgent=*/false); &conn->on_server_read_done);
} }
static void on_server_read_done(void* arg, grpc_error* error) { static void on_server_read_done(void* arg, grpc_error* error) {
@ -380,11 +380,11 @@ static void on_write_response_done_locked(void* arg, grpc_error* error) {
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn, GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_client_read_done, /*urgent=*/false); &conn->on_client_read_done);
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn, GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
&conn->on_server_read_done, /*urgent=*/false); &conn->on_server_read_done);
} }
static void on_write_response_done(void* arg, grpc_error* error) { static void on_write_response_done(void* arg, grpc_error* error) {
@ -484,7 +484,7 @@ static void on_read_request_done_locked(void* arg, grpc_error* error) {
GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn, GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_read_request_done, /*urgent=*/false); &conn->on_read_request_done);
return; return;
} }
// Make sure we got a CONNECT request. // Make sure we got a CONNECT request.
@ -579,7 +579,7 @@ static void on_accept(void* arg, grpc_endpoint* endpoint,
GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn, GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_read_request_done, /*urgent=*/false); &conn->on_read_request_done);
} }
// //

@ -59,8 +59,7 @@ class ReadAheadHandshaker : public Handshaker {
void DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/, void DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
grpc_closure* on_handshake_done, grpc_closure* on_handshake_done,
HandshakerArgs* args) override { HandshakerArgs* args) override {
grpc_endpoint_read(args->endpoint, args->read_buffer, on_handshake_done, grpc_endpoint_read(args->endpoint, args->read_buffer, on_handshake_done);
/*urgent=*/false);
} }
}; };

@ -121,8 +121,7 @@ struct read_and_write_test_state {
static void read_scheduler(void* data, grpc_error* /* error */) { static void read_scheduler(void* data, grpc_error* /* error */) {
struct read_and_write_test_state* state = struct read_and_write_test_state* state =
static_cast<struct read_and_write_test_state*>(data); static_cast<struct read_and_write_test_state*>(data);
grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read, grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read);
/*urgent=*/false);
} }
static void read_and_write_test_read_handler(void* data, grpc_error* error) { static void read_and_write_test_read_handler(void* data, grpc_error* error) {
@ -243,8 +242,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
read_and_write_test_write_handler(&state, GRPC_ERROR_NONE); read_and_write_test_write_handler(&state, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read, grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read);
/*urgent=*/false);
if (shutdown) { if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read"); gpr_log(GPR_DEBUG, "shutdown read");
grpc_endpoint_shutdown( grpc_endpoint_shutdown(
@ -309,16 +307,14 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_endpoint_add_to_pollset(f.client_ep, g_pollset); grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
grpc_endpoint_read(f.client_ep, &slice_buffer, grpc_endpoint_read(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx), grpc_schedule_on_exec_ctx));
/*urgent=*/false);
wait_for_fail_count(&fail_count, 0); wait_for_fail_count(&fail_count, 0);
grpc_endpoint_shutdown(f.client_ep, grpc_endpoint_shutdown(f.client_ep,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
wait_for_fail_count(&fail_count, 1); wait_for_fail_count(&fail_count, 1);
grpc_endpoint_read(f.client_ep, &slice_buffer, grpc_endpoint_read(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx), grpc_schedule_on_exec_ctx));
/*urgent=*/false);
wait_for_fail_count(&fail_count, 2); wait_for_fail_count(&fail_count, 2);
grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a")); grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
grpc_endpoint_write(f.client_ep, &slice_buffer, grpc_endpoint_write(f.client_ep, &slice_buffer,

@ -187,7 +187,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
grpc_slice_buffer_init(&read_one_slice); grpc_slice_buffer_init(&read_one_slice);
while (read_slices.length < kBufferSize) { while (read_slices.length < kBufferSize) {
init_event_closure(&read_done, &read); init_event_closure(&read_done, &read);
grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false); grpc_endpoint_read(ep_, &read_one_slice, &read_done);
XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES); XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
XCTAssertEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE); XCTAssertEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE);
grpc_slice_buffer_move_into(&read_one_slice, &read_slices); grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
@ -218,7 +218,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
grpc_slice_buffer_init(&read_slices); grpc_slice_buffer_init(&read_slices);
init_event_closure(&read_done, &read); init_event_closure(&read_done, &read);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false); grpc_endpoint_read(ep_, &read_slices, &read_done);
grpc_slice_buffer_init(&write_slices); grpc_slice_buffer_init(&write_slices);
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
@ -267,7 +267,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
init_event_closure(&read_done, &read); init_event_closure(&read_done, &read);
grpc_slice_buffer_init(&read_slices); grpc_slice_buffer_init(&read_slices);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false); grpc_endpoint_read(ep_, &read_slices, &read_done);
grpc_slice_buffer_init(&write_slices); grpc_slice_buffer_init(&write_slices);
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
@ -306,7 +306,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
init_event_closure(&read_done, &read); init_event_closure(&read_done, &read);
grpc_slice_buffer_init(&read_slices); grpc_slice_buffer_init(&read_slices);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false); grpc_endpoint_read(ep_, &read_slices, &read_done);
struct linger so_linger; struct linger so_linger;
so_linger.l_onoff = 1; so_linger.l_onoff = 1;

@ -193,8 +193,7 @@ static void read_cb(void* user_data, grpc_error* error) {
gpr_mu_unlock(g_mu); gpr_mu_unlock(g_mu);
} else { } else {
gpr_mu_unlock(g_mu); gpr_mu_unlock(g_mu);
grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb, grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb);
/*urgent=*/false);
} }
} }
@ -231,7 +230,7 @@ static void read_test(size_t num_bytes, size_t slice_size) {
grpc_slice_buffer_init(&state.incoming); grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false); grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
gpr_mu_lock(g_mu); gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) { while (state.read_bytes < state.target_read_bytes) {
@ -282,7 +281,7 @@ static void large_read_test(size_t slice_size) {
grpc_slice_buffer_init(&state.incoming); grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false); grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
gpr_mu_lock(g_mu); gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) { while (state.read_bytes < state.target_read_bytes) {
@ -521,7 +520,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
grpc_slice_buffer_init(&state.incoming); grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false); grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
gpr_mu_lock(g_mu); gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) { while (state.read_bytes < state.target_read_bytes) {

@ -182,7 +182,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
grpc_slice_buffer_init(&incoming); grpc_slice_buffer_init(&incoming);
GRPC_CLOSURE_INIT(&done_closure, inc_call_ctr, &n, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&done_closure, inc_call_ctr, &n, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(f.client_ep, &incoming, &done_closure, /*urgent=*/false); grpc_endpoint_read(f.client_ep, &incoming, &done_closure);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(n == 1); GPR_ASSERT(n == 1);

@ -137,8 +137,7 @@ class Client {
grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 3000; grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 3000;
while (true) { while (true) {
EventState state; EventState state;
grpc_endpoint_read(endpoint_, &read_buffer, state.closure(), grpc_endpoint_read(endpoint_, &read_buffer, state.closure());
/*urgent=*/true);
if (!PollUntilDone(&state, deadline)) { if (!PollUntilDone(&state, deadline)) {
retval = false; retval = false;
break; break;

@ -37,7 +37,7 @@ class EvalArgsMockEndpoint {
} }
grpc_endpoint* base() const { return const_cast<grpc_endpoint*>(&base_); } grpc_endpoint* base() const { return const_cast<grpc_endpoint*>(&base_); }
static void Read(grpc_endpoint* /*ep*/, grpc_slice_buffer* /*slices*/, static void Read(grpc_endpoint* /*ep*/, grpc_slice_buffer* /*slices*/,
grpc_closure* /*cb*/, bool /*unused*/) {} grpc_closure* /*cb*/) {}
static void Write(grpc_endpoint* /*ep*/, grpc_slice_buffer* /*slices*/, static void Write(grpc_endpoint* /*ep*/, grpc_slice_buffer* /*slices*/,
grpc_closure* /*cb*/, void* /*unused*/) {} grpc_closure* /*cb*/, void* /*unused*/) {}
static void AddToPollset(grpc_endpoint* /*ep*/, grpc_pollset* /*unused*/) {} static void AddToPollset(grpc_endpoint* /*ep*/, grpc_pollset* /*unused*/) {}

@ -45,7 +45,7 @@ typedef struct mock_endpoint {
} mock_endpoint; } mock_endpoint;
static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool /*urgent*/) { grpc_closure* cb) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep); mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
gpr_mu_lock(&m->mu); gpr_mu_lock(&m->mu);
if (m->read_buffer.count > 0) { if (m->read_buffer.count > 0) {

@ -58,7 +58,7 @@ struct passthru_endpoint {
}; };
static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool /*urgent*/) { grpc_closure* cb) {
half* m = reinterpret_cast<half*>(ep); half* m = reinterpret_cast<half*>(ep);
gpr_mu_lock(&m->parent->mu); gpr_mu_lock(&m->parent->mu);
if (m->parent->shutdown) { if (m->parent->shutdown) {

@ -47,9 +47,9 @@ typedef struct {
} trickle_endpoint; } trickle_endpoint;
static void te_read(grpc_endpoint* ep, grpc_slice_buffer* slices, static void te_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool urgent) { grpc_closure* cb) {
trickle_endpoint* te = reinterpret_cast<trickle_endpoint*>(ep); trickle_endpoint* te = reinterpret_cast<trickle_endpoint*>(ep);
grpc_endpoint_read(te->wrapped, slices, cb, urgent); grpc_endpoint_read(te->wrapped, slices, cb);
} }
static void maybe_call_write_cb_locked(trickle_endpoint* te) { static void maybe_call_write_cb_locked(trickle_endpoint* te) {

@ -93,7 +93,7 @@ class PhonyEndpoint : public grpc_endpoint {
} }
static void read(grpc_endpoint* ep, grpc_slice_buffer* slices, static void read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool /*urgent*/) { grpc_closure* cb) {
static_cast<PhonyEndpoint*>(ep)->QueueRead(slices, cb); static_cast<PhonyEndpoint*>(ep)->QueueRead(slices, cb);
} }

Loading…
Cancel
Save