From 339e421b29dc5a1369367ce6863a76ba8f3eda71 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 10 May 2017 12:52:45 -0700 Subject: [PATCH] Change endpoint interface to declare poller coveredness --- .../client_channel/http_connect_handshaker.c | 6 +-- .../chttp2/transport/chttp2_transport.c | 4 +- src/core/lib/http/httpcli.c | 5 ++- src/core/lib/iomgr/endpoint.c | 10 +++-- src/core/lib/iomgr/endpoint.h | 12 ++++-- src/core/lib/iomgr/tcp_posix.c | 37 ++++++++++++++++--- .../lib/security/transport/secure_endpoint.c | 11 ++++-- .../security/transport/security_handshaker.c | 6 +-- test/core/bad_client/bad_client.c | 5 ++- test/core/end2end/bad_server_response_test.c | 8 ++-- .../end2end/fixtures/http_proxy_fixture.c | 23 ++++++------ test/core/iomgr/endpoint_tests.c | 12 +++--- test/core/iomgr/tcp_posix_test.c | 11 +++--- test/core/security/secure_endpoint_test.c | 2 +- test/core/util/mock_endpoint.c | 6 ++- test/core/util/passthru_endpoint.c | 6 ++- test/core/util/trickle_endpoint.c | 10 +++-- 17 files changed, 110 insertions(+), 64 deletions(-) diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.c b/src/core/ext/filters/client_channel/http_connect_handshaker.c index c09a863d008..5f6ea71de45 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.c +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.c @@ -151,7 +151,7 @@ static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, // Otherwise, read the response. // The read callback inherits our ref to the handshaker. grpc_endpoint_read(exec_ctx, handshaker->args->endpoint, - handshaker->args->read_buffer, + handshaker->args->read_buffer, true, &handshaker->response_read_closure); gpr_mu_unlock(&handshaker->mu); } @@ -215,7 +215,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice_buffer_reset_and_unref_internal(exec_ctx, handshaker->args->read_buffer); grpc_endpoint_read(exec_ctx, handshaker->args->endpoint, - handshaker->args->read_buffer, + handshaker->args->read_buffer, true, &handshaker->response_read_closure); gpr_mu_unlock(&handshaker->mu); return; @@ -338,7 +338,7 @@ static void http_connect_handshaker_do_handshake( gpr_free(header_strings); // Take a new ref to be held by the write callback. gpr_ref(&handshaker->refcount); - grpc_endpoint_write(exec_ctx, args->endpoint, &handshaker->write_buffer, + grpc_endpoint_write(exec_ctx, args->endpoint, &handshaker->write_buffer, true, &handshaker->request_done_closure); gpr_mu_unlock(&handshaker->mu); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 30738080ee7..b7f4d4f4d69 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -909,7 +909,7 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { grpc_chttp2_transport *t = gt; GPR_TIMER_BEGIN("write_action", 0); grpc_endpoint_write( - exec_ctx, t->ep, &t->outbuf, + exec_ctx, t->ep, &t->outbuf, true, grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t, grpc_combiner_scheduler(t->combiner, false))); GPR_TIMER_END("write_action", 0); @@ -2267,7 +2267,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->read_buffer); if (keep_reading) { - grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, + grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, true, &t->read_action_locked); if (t->enable_bdp_probe) { diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 0ac2c2ad52a..1d9f45f906d 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -139,7 +139,7 @@ static void append_error(internal_request *req, grpc_error *error) { } static void do_read(grpc_exec_ctx *exec_ctx, internal_request *req) { - grpc_endpoint_read(exec_ctx, req->ep, &req->incoming, &req->on_read); + grpc_endpoint_read(exec_ctx, req->ep, &req->incoming, true, &req->on_read); } static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, @@ -184,7 +184,8 @@ static void done_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void start_write(grpc_exec_ctx *exec_ctx, internal_request *req) { grpc_slice_ref_internal(req->request_text); grpc_slice_buffer_add(&req->outgoing, req->request_text); - grpc_endpoint_write(exec_ctx, req->ep, &req->outgoing, &req->done_write); + grpc_endpoint_write(exec_ctx, req->ep, &req->outgoing, true, + &req->done_write); } static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index bf6e98146a4..d9c90ec46b0 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -34,13 +34,15 @@ #include "src/core/lib/iomgr/endpoint.h" void grpc_endpoint_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_slice_buffer* slices, grpc_closure* cb) { - ep->vtable->read(exec_ctx, ep, slices, cb); + grpc_slice_buffer* slices, bool covered_by_poller, + grpc_closure* cb) { + ep->vtable->read(exec_ctx, ep, slices, covered_by_poller, cb); } void grpc_endpoint_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_slice_buffer* slices, grpc_closure* cb) { - ep->vtable->write(exec_ctx, ep, slices, cb); + grpc_slice_buffer* slices, bool covered_by_poller, + grpc_closure* cb) { + ep->vtable->write(exec_ctx, ep, slices, covered_by_poller, cb); } void grpc_endpoint_add_to_pollset(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 740357ecc54..c97cacb22f2 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -49,9 +49,11 @@ typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; struct grpc_endpoint_vtable { void (*read)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb); + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb); void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb); + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb); grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep); void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *pollset); @@ -70,7 +72,8 @@ struct grpc_endpoint_vtable { Valid slices may be placed into \a slices even when the callback is invoked with error != GRPC_ERROR_NONE. */ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb); + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb); char *grpc_endpoint_get_peer(grpc_endpoint *ep); @@ -92,7 +95,8 @@ grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep); it is a valid slice buffer. */ void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb); + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb); /* Causes any pending and future read/write callbacks to run immediately with success==0 */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 5f4b38de2b9..86048c436d4 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -81,6 +81,8 @@ typedef struct { grpc_fd *em_fd; int fd; bool finished_edge; + bool read_covered_by_poller; + bool write_covered_by_poller; msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */ double target_length; double bytes_read_this_round; @@ -114,6 +116,17 @@ typedef struct { grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; +static void call_notify_function_and_maybe_arrange_poller( + grpc_exec_ctx *exec_ctx, grpc_fd *fd, bool covered_by_poller, + grpc_closure *closure, + void (*notify_func)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure *closure)) { + notify_func(exec_ctx, fd, closure); + if (!covered_by_poller) { + abort(); + } +} + static void add_to_estimate(grpc_tcp *tcp, size_t bytes) { tcp->bytes_read_this_round += (double)bytes; } @@ -276,7 +289,9 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (errno == EAGAIN) { finish_estimate(tcp); /* We've consumed the edge, request a new one */ - grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); + call_notify_function_and_maybe_arrange_poller( + exec_ctx, tcp->em_fd, tcp->read_covered_by_poller, &tcp->read_closure, + grpc_fd_notify_on_read); } else { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); @@ -351,17 +366,21 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, } static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *incoming_buffer, grpc_closure *cb) { + grpc_slice_buffer *incoming_buffer, bool covered_by_poller, + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; + tcp->read_covered_by_poller = covered_by_poller; tcp->incoming_buffer = incoming_buffer; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, incoming_buffer); grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = false; - grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); + call_notify_function_and_maybe_arrange_poller( + exec_ctx, tcp->em_fd, tcp->read_covered_by_poller, &tcp->read_closure, + grpc_fd_notify_on_read); } else { grpc_closure_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE); } @@ -471,7 +490,9 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, if (grpc_tcp_trace) { gpr_log(GPR_DEBUG, "write: delayed"); } - grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); + call_notify_function_and_maybe_arrange_poller( + exec_ctx, tcp->em_fd, tcp->write_covered_by_poller, &tcp->write_closure, + grpc_fd_notify_on_write); } else { cb = tcp->write_cb; tcp->write_cb = NULL; @@ -486,7 +507,8 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, } static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *buf, grpc_closure *cb) { + grpc_slice_buffer *buf, bool covered_by_poller, + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_error *error = GRPC_ERROR_NONE; @@ -517,6 +539,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->outgoing_buffer = buf; tcp->outgoing_slice_idx = 0; tcp->outgoing_byte_idx = 0; + tcp->write_covered_by_poller = covered_by_poller; if (!tcp_flush(tcp, &error)) { TCP_REF(tcp, "write"); @@ -524,7 +547,9 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (grpc_tcp_trace) { gpr_log(GPR_DEBUG, "write: delayed"); } - grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); + call_notify_function_and_maybe_arrange_poller( + exec_ctx, tcp->em_fd, tcp->write_covered_by_poller, &tcp->write_closure, + grpc_fd_notify_on_write); } else { if (grpc_tcp_trace) { const char *str = grpc_error_string(error); diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 0d5c7432c64..ea1bd80eb26 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -231,7 +231,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, } static void endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { secure_endpoint *ep = (secure_endpoint *)secure_ep; ep->read_cb = cb; ep->read_buffer = slices; @@ -246,7 +247,7 @@ static void endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, } grpc_endpoint_read(exec_ctx, ep->wrapped_ep, &ep->source_buffer, - &ep->on_read); + covered_by_poller, &ep->on_read); } static void flush_write_staging_buffer(secure_endpoint *ep, uint8_t **cur, @@ -258,7 +259,8 @@ static void flush_write_staging_buffer(secure_endpoint *ep, uint8_t **cur, } static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { GPR_TIMER_BEGIN("secure_endpoint.endpoint_write", 0); unsigned i; @@ -340,7 +342,8 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, return; } - grpc_endpoint_write(exec_ctx, ep->wrapped_ep, &ep->output_buffer, cb); + grpc_endpoint_write(exec_ctx, ep->wrapped_ep, &ep->output_buffer, + covered_by_poller, cb); GPR_TIMER_END("secure_endpoint.endpoint_write", 0); } diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 509b4b556d6..85b47e038da 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -224,7 +224,7 @@ static grpc_error *send_handshake_bytes_to_peer_locked(grpc_exec_ctx *exec_ctx, grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset); grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing); grpc_slice_buffer_add(&h->outgoing, to_send); - grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing, + grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing, true, &h->on_handshake_data_sent_to_peer); return GRPC_ERROR_NONE; } @@ -256,7 +256,7 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, /* We may need more data. */ if (result == TSI_INCOMPLETE_DATA) { grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, - &h->on_handshake_data_received_from_peer); + true, &h->on_handshake_data_received_from_peer); goto done; } else { error = send_handshake_bytes_to_peer_locked(exec_ctx, h); @@ -323,7 +323,7 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, } /* We may be done. */ if (tsi_handshaker_is_in_progress(h->handshaker)) { - grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, true, &h->on_handshake_data_received_from_peer); } else { error = check_peer_locked(exec_ctx, h); diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index 8dbc5aa8612..fd143a0c4cb 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -153,7 +153,8 @@ void grpc_run_bad_client_test( grpc_schedule_on_exec_ctx); /* Write data */ - grpc_endpoint_write(&exec_ctx, sfd.client, &outgoing, &done_write_closure); + grpc_endpoint_write(&exec_ctx, sfd.client, &outgoing, true, + &done_write_closure); grpc_exec_ctx_finish(&exec_ctx); /* Await completion */ @@ -181,7 +182,7 @@ void grpc_run_bad_client_test( grpc_closure read_done_closure; grpc_closure_init(&read_done_closure, read_done, &args, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, sfd.client, &args.incoming, + grpc_endpoint_read(&exec_ctx, sfd.client, &args.incoming, true, &read_done_closure); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT( diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c index fe7e674d17c..f6897ef8c6c 100644 --- a/test/core/end2end/bad_server_response_test.c +++ b/test/core/end2end/bad_server_response_test.c @@ -120,7 +120,8 @@ static void handle_write(grpc_exec_ctx *exec_ctx) { grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer); grpc_slice_buffer_add(&state.outgoing_buffer, slice); - grpc_endpoint_write(exec_ctx, state.tcp, &state.outgoing_buffer, &on_write); + grpc_endpoint_write(exec_ctx, state.tcp, &state.outgoing_buffer, true, + &on_write); } static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -142,7 +143,7 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD) { handle_write(exec_ctx); } else { - grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, + grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, true, &on_read); } } @@ -159,7 +160,8 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, state.tcp = tcp; state.incoming_data_length = 0; grpc_endpoint_add_to_pollset(exec_ctx, tcp, server->pollset); - grpc_endpoint_read(exec_ctx, tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(exec_ctx, tcp, &state.temp_incoming_buffer, true, + &on_read); } static gpr_timespec n_sec_deadline(int seconds) { diff --git a/test/core/end2end/fixtures/http_proxy_fixture.c b/test/core/end2end/fixtures/http_proxy_fixture.c index f0d09487c62..a095f98c4cb 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.c +++ b/test/core/end2end/fixtures/http_proxy_fixture.c @@ -170,7 +170,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice_buffer_move_into(&conn->client_deferred_write_buffer, &conn->client_write_buffer); grpc_endpoint_write(exec_ctx, conn->client_endpoint, - &conn->client_write_buffer, + &conn->client_write_buffer, true, &conn->on_client_write_done); } else { // No more writes. Unref the connection. @@ -195,7 +195,7 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice_buffer_move_into(&conn->server_deferred_write_buffer, &conn->server_write_buffer); grpc_endpoint_write(exec_ctx, conn->server_endpoint, - &conn->server_write_buffer, + &conn->server_write_buffer, true, &conn->on_server_write_done); } else { // No more writes. Unref the connection. @@ -227,12 +227,12 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, &conn->server_write_buffer); proxy_connection_ref(conn, "client_read"); grpc_endpoint_write(exec_ctx, conn->server_endpoint, - &conn->server_write_buffer, + &conn->server_write_buffer, true, &conn->on_server_write_done); } // Read more data. grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done); + true, &conn->on_client_read_done); } // Callback for reading data from the backend server, which will be @@ -259,12 +259,12 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, &conn->client_write_buffer); proxy_connection_ref(conn, "server_read"); grpc_endpoint_write(exec_ctx, conn->client_endpoint, - &conn->client_write_buffer, + &conn->client_write_buffer, true, &conn->on_client_write_done); } // Read more data. grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done); + true, &conn->on_server_read_done); } // Callback to write the HTTP response for the CONNECT request. @@ -285,9 +285,9 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, proxy_connection_ref(conn, "server_read"); proxy_connection_unref(exec_ctx, conn, "write_response"); grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done); + true, &conn->on_client_read_done); grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done); + true, &conn->on_server_read_done); } // Callback to connect to the backend server specified by the HTTP @@ -312,7 +312,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice_from_copied_string("HTTP/1.0 200 connected\r\n\r\n"); grpc_slice_buffer_add(&conn->client_write_buffer, slice); grpc_endpoint_write(exec_ctx, conn->client_endpoint, - &conn->client_write_buffer, + &conn->client_write_buffer, true, &conn->on_write_response_done); } @@ -349,7 +349,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, // If we're not done reading the request, read more data. if (conn->http_parser.state != GRPC_HTTP_BODY) { grpc_endpoint_read(exec_ctx, conn->client_endpoint, - &conn->client_read_buffer, &conn->on_read_request_done); + &conn->client_read_buffer, true, + &conn->on_read_request_done); return; } // Make sure we got a CONNECT request. @@ -422,7 +423,7 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, &conn->http_request); grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done); + true, &conn->on_read_request_done); } // diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index e274796e237..8ee94f27b0a 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -143,7 +143,7 @@ static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)); gpr_mu_unlock(g_mu); } else if (error == GRPC_ERROR_NONE) { - grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, + grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, true, &state->done_read); } } @@ -165,7 +165,7 @@ static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, &state->current_write_data); grpc_slice_buffer_reset_and_unref(&state->outgoing); grpc_slice_buffer_addn(&state->outgoing, slices, nslices); - grpc_endpoint_write(exec_ctx, state->write_ep, &state->outgoing, + grpc_endpoint_write(exec_ctx, state->write_ep, &state->outgoing, true, &state->done_write); gpr_free(slices); return; @@ -228,7 +228,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, read_and_write_test_write_handler(&exec_ctx, &state, GRPC_ERROR_NONE); grpc_exec_ctx_flush(&exec_ctx); - grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming, + grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming, true, &state.done_read); if (shutdown) { @@ -296,19 +296,19 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); - grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, true, grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 0); grpc_endpoint_shutdown(&exec_ctx, f.client_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); wait_for_fail_count(&exec_ctx, &fail_count, 1); - grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, true, grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 2); grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a")); - grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer, + grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer, true, grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 3); diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 2c53a003d23..f1d1bd409a5 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -164,7 +164,8 @@ static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, if (state->read_bytes >= state->target_read_bytes) { gpr_mu_unlock(g_mu); } else { - grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb); + grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, true, + &state->read_cb); gpr_mu_unlock(g_mu); } } @@ -200,7 +201,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, true, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { @@ -252,7 +253,7 @@ static void large_read_test(size_t slice_size) { grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, true, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { @@ -393,7 +394,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_closure_init(&write_done_closure, write_done, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure); + grpc_endpoint_write(&exec_ctx, ep, &outgoing, true, &write_done_closure); drain_socket_blocking(sv[0], num_bytes, num_bytes); gpr_mu_lock(g_mu); for (;;) { @@ -463,7 +464,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, true, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index 71d8057ac31..59c99b81f93 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -162,7 +162,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { grpc_slice_buffer_init(&incoming); grpc_closure_init(&done_closure, inc_call_ctr, &n, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, f.client_ep, &incoming, &done_closure); + grpc_endpoint_read(&exec_ctx, f.client_ep, &incoming, true, &done_closure); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(n == 1); GPR_ASSERT(incoming.count == 1); diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index c747297984b..d55e82cb5d5 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -56,7 +56,8 @@ typedef struct grpc_mock_endpoint { } grpc_mock_endpoint; static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; gpr_mu_lock(&m->mu); if (m->read_buffer.count > 0) { @@ -70,7 +71,8 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; for (size_t i = 0; i < slices->count; i++) { m->on_write(slices->slices[i]); diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index 6400845d235..bce0924e1bd 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -71,7 +71,8 @@ struct passthru_endpoint { }; static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { half *m = (half *)ep; gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { @@ -93,7 +94,8 @@ static half *other_half(half *h) { } static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { half *m = other_half((half *)ep); gpr_mu_lock(&m->parent->mu); grpc_error *error = GRPC_ERROR_NONE; diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index 69386a07181..a8b6c5b52ca 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -61,9 +61,10 @@ typedef struct { } trickle_endpoint; static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { trickle_endpoint *te = (trickle_endpoint *)ep; - grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb); + grpc_endpoint_read(exec_ctx, te->wrapped, slices, covered_by_poller, cb); } static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx, @@ -76,7 +77,8 @@ static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx, } static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, bool covered_by_poller, + grpc_closure *cb) { trickle_endpoint *te = (trickle_endpoint *)ep; gpr_mu_lock(&te->mu); GPR_ASSERT(te->write_cb == NULL); @@ -201,7 +203,7 @@ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, te->writing = true; te->last_write = now; grpc_endpoint_write( - exec_ctx, te->wrapped, &te->writing_buffer, + exec_ctx, te->wrapped, &te->writing_buffer, true, grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); maybe_call_write_cb_locked(exec_ctx, te); }