Adding a min progress size argument to grpc_endpoint_read to allow gRPC to use TCP optimizations on the read path (#29503)

* adding a min progress size argument to grpc_endpoint_read

* fix missing argument error

* adding a static_cast

* reverting changes in tcp_posix.cc

* add missing changes to CFStreamEndpointTests.mm
pull/29521/head
Vignesh Babu 3 years ago committed by GitHub
parent 6d480e9d34
commit 53e382729f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  2. 3
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  3. 3
      src/core/lib/http/httpcli.h
  4. 4
      src/core/lib/iomgr/endpoint.cc
  5. 4
      src/core/lib/iomgr/endpoint.h
  6. 3
      src/core/lib/iomgr/endpoint_cfstream.cc
  7. 3
      src/core/lib/iomgr/event_engine/endpoint.cc
  8. 2
      src/core/lib/iomgr/tcp_posix.cc
  9. 2
      src/core/lib/iomgr/tcp_windows.cc
  10. 6
      src/core/lib/security/transport/secure_endpoint.cc
  11. 6
      src/core/lib/security/transport/security_handshaker.cc
  12. 2
      test/core/bad_client/bad_client.cc
  13. 6
      test/core/end2end/bad_server_response_test.cc
  14. 18
      test/core/end2end/fixtures/http_proxy_fixture.cc
  15. 2
      test/core/handshake/readahead_handshaker_server_ssl.cc
  16. 8
      test/core/iomgr/endpoint_tests.cc
  17. 12
      test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm
  18. 11
      test/core/iomgr/tcp_posix_test.cc
  19. 3
      test/core/security/secure_endpoint_test.cc
  20. 5
      test/core/transport/chttp2/graceful_shutdown_test.cc
  21. 2
      test/core/transport/chttp2/settings_timeout_test.cc
  22. 5
      test/core/transport/chttp2/streams_not_seen_test.cc
  23. 3
      test/core/util/mock_endpoint.cc
  24. 3
      test/core/util/passthru_endpoint.cc
  25. 3
      test/cpp/microbenchmarks/bm_chttp2_transport.cc

@ -164,7 +164,7 @@ void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
&HttpConnectHandshaker::OnReadDoneScheduler,
handshaker, grpc_schedule_on_exec_ctx),
/*urgent=*/true);
/*urgent=*/true, /*min_progress_size=*/1);
}
}
@ -240,7 +240,7 @@ void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
&HttpConnectHandshaker::OnReadDoneScheduler,
handshaker, grpc_schedule_on_exec_ctx),
/*urgent=*/true);
/*urgent=*/true, /*min_progress_size=*/1);
return;
}
// Make sure we got a 2xx response.

@ -2666,7 +2666,8 @@ static void continue_read_action_locked(grpc_chttp2_transport* t) {
const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent,
/*min_progress_size=*/1);
grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
}

@ -172,7 +172,8 @@ class HttpRequest : public InternallyRefCounted<HttpRequest> {
void DoRead() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
Ref().release(); // ref held by pending read
grpc_endpoint_read(ep_, &incoming_, &on_read_, /*urgent=*/true);
grpc_endpoint_read(ep_, &incoming_, &on_read_, /*urgent=*/true,
/*min_progress_size=*/1);
}
static void OnRead(void* user_data, grpc_error_handle error) {

@ -23,8 +23,8 @@
grpc_core::TraceFlag grpc_tcp_trace(false, "tcp");
void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool urgent) {
ep->vtable->read(ep, slices, cb, urgent);
grpc_closure* cb, bool urgent, int min_progress_size) {
ep->vtable->read(ep, slices, cb, urgent, min_progress_size);
}
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,

@ -38,7 +38,7 @@ typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
struct grpc_endpoint_vtable {
void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
bool urgent);
bool urgent, int min_progress_size);
void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
void* arg);
void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
@ -58,7 +58,7 @@ struct grpc_endpoint_vtable {
Valid slices may be placed into \a slices even when the callback is
invoked with error != GRPC_ERROR_NONE. */
void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool urgent);
grpc_closure* cb, bool urgent, int min_progress_size);
absl::string_view grpc_endpoint_get_peer(grpc_endpoint* ep);

@ -237,7 +237,8 @@ static void WriteAction(void* arg, grpc_error_handle error) {
}
static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool urgent) {
grpc_closure* cb, bool urgent,
int /*min_progress_size*/) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,

@ -42,7 +42,8 @@ using ::grpc_event_engine::experimental::ResolvedAddressToURI;
using ::grpc_event_engine::experimental::SliceBuffer;
void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool /* urgent */) {
grpc_closure* cb, bool /* urgent */,
int /* min_progress_size */) {
auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
if (eeep->endpoint == nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_CANCELLED);

@ -912,7 +912,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
}
static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
grpc_closure* cb, bool urgent) {
grpc_closure* cb, bool urgent, int /*min_progress_size*/) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
GPR_ASSERT(tcp->read_cb == nullptr);
tcp->read_cb = cb;

@ -237,7 +237,7 @@ static void on_read(void* tcpp, grpc_error_handle error) {
#define DEFAULT_TARGET_READ_SIZE 8192
#define MAX_WSABUF_COUNT 16
static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
grpc_closure* cb, bool urgent) {
grpc_closure* cb, bool urgent, int /*min_progress_size*/) {
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_winsocket* handle = tcp->socket;
grpc_winsocket_callback_info* info = &handle->read_info;

@ -318,7 +318,8 @@ static void on_read(void* user_data, grpc_error_handle error) {
}
static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool urgent) {
grpc_closure* cb, bool urgent,
int /*min_progress_size*/) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
ep->read_cb = cb;
ep->read_buffer = slices;
@ -332,7 +333,8 @@ static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
return;
}
grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent);
grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent,
/*min_progress_size=*/1);
}
static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,

@ -369,7 +369,7 @@ grpc_error_handle SecurityHandshaker::OnHandshakeNextDoneLocked(
&on_handshake_data_received_from_peer_,
&SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler,
this, grpc_schedule_on_exec_ctx),
/*urgent=*/true);
/*urgent=*/true, /*min_progress_size=*/1);
return error;
}
if (result != TSI_OK) {
@ -402,7 +402,7 @@ grpc_error_handle SecurityHandshaker::OnHandshakeNextDoneLocked(
&on_handshake_data_received_from_peer_,
&SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler,
this, grpc_schedule_on_exec_ctx),
/*urgent=*/true);
/*urgent=*/true, /*min_progress_size=*/1);
} else {
// Handshake has finished, check peer and so on.
error = CheckPeerLocked();
@ -508,7 +508,7 @@ void SecurityHandshaker::OnHandshakeDataSentToPeerFn(void* arg,
&h->on_handshake_data_received_from_peer_,
&SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler,
h.get(), grpc_schedule_on_exec_ctx),
/*urgent=*/true);
/*urgent=*/true, /*min_progress_size=*/1);
} else {
error = h->CheckPeerLocked();
if (error != GRPC_ERROR_NONE) {

@ -149,7 +149,7 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags,
GRPC_CLOSURE_INIT(&read_done_closure, set_read_done, &read_done_event,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(sfd->client, &incoming, &read_done_closure,
/*urgent=*/true);
/*urgent=*/true, /*min_progress_size=*/1);
grpc_core::ExecCtx::Get()->Flush();
do {
GPR_ASSERT(gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0);

@ -99,7 +99,7 @@ static void done_writing_settings_frame(void* /* arg */,
grpc_error_handle error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
/*urgent=*/false);
/*urgent=*/false, /*min_progress_size=*/1);
}
static void handle_write() {
@ -138,7 +138,7 @@ static void handle_read(void* /*arg*/, grpc_error_handle error) {
handle_write();
} else {
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
/*urgent=*/false);
/*urgent=*/false, /*min_progress_size=*/1);
}
}
@ -166,7 +166,7 @@ static void on_connect(void* arg, grpc_endpoint* tcp,
&on_writing_settings_frame, nullptr);
} else {
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
/*urgent=*/false);
/*urgent=*/false, /*min_progress_size=*/1);
}
}

@ -309,7 +309,8 @@ static void on_client_read_done_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_client_read_done, /*urgent=*/false);
&conn->on_client_read_done, /*urgent=*/false,
/*min_progress_size=*/1);
}
static void on_client_read_done(void* arg, grpc_error_handle error) {
@ -351,7 +352,8 @@ static void on_server_read_done_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
&conn->on_server_read_done, /*urgent=*/false);
&conn->on_server_read_done, /*urgent=*/false,
/*min_progress_size=*/1);
}
static void on_server_read_done(void* arg, grpc_error_handle error) {
@ -381,11 +383,13 @@ static void on_write_response_done_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_client_read_done, /*urgent=*/false);
&conn->on_client_read_done, /*urgent=*/false,
/*min_progress_size=*/1);
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
&conn->on_server_read_done, /*urgent=*/false);
&conn->on_server_read_done, /*urgent=*/false,
/*min_progress_size=*/1);
}
static void on_write_response_done(void* arg, grpc_error_handle error) {
@ -485,7 +489,8 @@ static void on_read_request_done_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_read_request_done, /*urgent=*/false);
&conn->on_read_request_done, /*urgent=*/false,
/*min_progress_size=*/1);
return;
}
// Make sure we got a CONNECT request.
@ -580,7 +585,8 @@ static void on_accept(void* arg, grpc_endpoint* endpoint,
GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_read_request_done, /*urgent=*/false);
&conn->on_read_request_done, /*urgent=*/false,
/*min_progress_size=*/1);
}
//

@ -60,7 +60,7 @@ class ReadAheadHandshaker : public Handshaker {
grpc_closure* on_handshake_done,
HandshakerArgs* args) override {
grpc_endpoint_read(args->endpoint, args->read_buffer, on_handshake_done,
/*urgent=*/false);
/*urgent=*/false, /*min_progress_size=*/1);
}
};

@ -123,7 +123,7 @@ static void read_scheduler(void* data, grpc_error_handle /* error */) {
struct read_and_write_test_state* state =
static_cast<struct read_and_write_test_state*>(data);
grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read,
/*urgent=*/false);
/*urgent=*/false, /*min_progress_size=*/1);
}
static void read_and_write_test_read_handler(void* data,
@ -247,7 +247,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
grpc_core::ExecCtx::Get()->Flush();
grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read,
/*urgent=*/false);
/*urgent=*/false, /*min_progress_size=*/1);
if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read");
grpc_endpoint_shutdown(
@ -313,7 +313,7 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_endpoint_read(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx),
/*urgent=*/false);
/*urgent=*/false, /*min_progress_size=*/1);
wait_for_fail_count(&fail_count, 0);
grpc_endpoint_shutdown(f.client_ep,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
@ -321,7 +321,7 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_endpoint_read(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx),
/*urgent=*/false);
/*urgent=*/false, /*min_progress_size=*/1);
wait_for_fail_count(&fail_count, 2);
grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
grpc_endpoint_write(f.client_ep, &slice_buffer,

@ -195,7 +195,8 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
while (read_slices.length < kBufferSize) {
std::promise<grpc_error_handle> read_promise;
init_event_closure(&read_done, &read_promise);
grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false);
grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false,
/*min_progress_size=*/1);
std::future<grpc_error_handle> read_future = read_promise.get_future();
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
XCTAssertEqual(read_future.get(), GRPC_ERROR_NONE);
@ -227,7 +228,8 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
grpc_slice_buffer_init(&read_slices);
init_event_closure(&read_done, &read_promise);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
/*min_progress_size=*/1);
grpc_slice_buffer_init(&write_slices);
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
@ -278,7 +280,8 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
init_event_closure(&read_done, &read_promise);
grpc_slice_buffer_init(&read_slices);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
/*min_progress_size=*/1);
grpc_slice_buffer_init(&write_slices);
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
@ -320,7 +323,8 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
init_event_closure(&read_done, &read_promise);
grpc_slice_buffer_init(&read_slices);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
/*min_progress_size=*/1);
struct linger so_linger;
so_linger.l_onoff = 1;

@ -194,7 +194,7 @@ static void read_cb(void* user_data, grpc_error_handle error) {
} else {
gpr_mu_unlock(g_mu);
grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb,
/*urgent=*/false);
/*urgent=*/false, /*min_progress_size=*/1);
}
}
@ -235,7 +235,8 @@ static void read_test(size_t num_bytes, size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false,
/*min_progress_size=*/1);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
@ -292,7 +293,8 @@ static void large_read_test(size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false,
/*min_progress_size=*/1);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
@ -543,7 +545,8 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false,
/*min_progress_size=*/1);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {

@ -189,7 +189,8 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
grpc_slice_buffer_init(&incoming);
GRPC_CLOSURE_INIT(&done_closure, inc_call_ctr, &n, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(f.client_ep, &incoming, &done_closure, /*urgent=*/false);
grpc_endpoint_read(f.client_ep, &incoming, &done_closure, /*urgent=*/false,
/*min_progress_size=*/1);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(n == 1);

@ -114,7 +114,8 @@ class GracefulShutdownTest : public ::testing::Test {
// Start reading on the client
grpc_slice_buffer_init(&read_buffer_);
GRPC_CLOSURE_INIT(&on_read_done_, OnReadDone, this, nullptr);
grpc_endpoint_read(fds_.client, &read_buffer_, &on_read_done_, false);
grpc_endpoint_read(fds_.client, &read_buffer_, &on_read_done_, false,
/*min_progress_size=*/1);
}
// Shuts down and destroys the client and server.
@ -151,7 +152,7 @@ class GracefulShutdownTest : public ::testing::Test {
}
grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
grpc_endpoint_read(self->fds_.client, &self->read_buffer_,
&self->on_read_done_, false);
&self->on_read_done_, false, /*min_progress_size=*/1);
} else {
grpc_slice_buffer_destroy(&self->read_buffer_);
self->read_end_notification_.Notify();

@ -150,7 +150,7 @@ class Client {
while (true) {
EventState state;
grpc_endpoint_read(endpoint_, &read_buffer, state.closure(),
/*urgent=*/true);
/*urgent=*/true, /*min_progress_size=*/1);
if (!PollUntilDone(&state, deadline)) {
retval = false;
break;

@ -258,7 +258,8 @@ class StreamsNotSeenTest : public ::testing::Test {
StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
self->tcp_ = tcp;
grpc_endpoint_add_to_pollset(tcp, self->server_.pollset[0]);
grpc_endpoint_read(tcp, &self->read_buffer_, &self->on_read_done_, false);
grpc_endpoint_read(tcp, &self->read_buffer_, &self->on_read_done_, false,
/*min_progress_size=*/1);
std::thread([self]() {
ExecCtx exec_ctx;
// Send settings frame from server
@ -340,7 +341,7 @@ class StreamsNotSeenTest : public ::testing::Test {
}
grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
grpc_endpoint_read(self->tcp_, &self->read_buffer_, &self->on_read_done_,
false);
false, /*min_progress_size=*/1);
} else {
grpc_slice_buffer_destroy(&self->read_buffer_);
self->read_end_notification_.Notify();

@ -39,7 +39,8 @@ typedef struct mock_endpoint {
} mock_endpoint;
static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool /*urgent*/) {
grpc_closure* cb, bool /*urgent*/,
int /*min_progress_size*/) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
gpr_mu_lock(&m->mu);
if (m->read_buffer.count > 0) {

@ -110,7 +110,8 @@ static void do_pending_read_op_locked(half* m, grpc_error_handle error) {
}
static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool /*urgent*/) {
grpc_closure* cb, bool /*urgent*/,
int /*min_progress_size*/) {
half* m = reinterpret_cast<half*>(ep);
gpr_mu_lock(&m->parent->mu);
if (m->parent->shutdown) {

@ -96,7 +96,8 @@ class PhonyEndpoint : public grpc_endpoint {
}
static void read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool /*urgent*/) {
grpc_closure* cb, bool /*urgent*/,
int /*min_progress_size*/) {
static_cast<PhonyEndpoint*>(ep)->QueueRead(slices, cb);
}

Loading…
Cancel
Save