Adding a max_frame_size argument to grpc_endpoint_write API to allow a sender to control frame sizes (#29526)

* adding a max_frame_size argument to grpc_endpoint_write API

* fix syntax error

* fix typo
pull/29505/head
Vignesh Babu 3 years ago committed by GitHub
parent 2b00c7d2ad
commit b92f885756
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 4
      src/core/lib/http/httpcli.cc
  3. 4
      src/core/lib/iomgr/endpoint.cc
  4. 6
      src/core/lib/iomgr/endpoint.h
  5. 2
      src/core/lib/iomgr/endpoint_cfstream.cc
  6. 2
      src/core/lib/iomgr/event_engine/endpoint.cc
  7. 2
      src/core/lib/iomgr/tcp_posix.cc
  8. 2
      src/core/lib/iomgr/tcp_windows.cc
  9. 8
      src/core/lib/security/transport/secure_endpoint.cc
  10. 2
      src/core/lib/security/transport/security_handshaker.cc
  11. 3
      src/core/lib/transport/http_connect_handshaker.cc
  12. 4
      test/core/bad_client/bad_client.cc
  13. 7
      test/core/end2end/bad_server_response_test.cc
  14. 16
      test/core/end2end/fixtures/http_proxy_fixture.cc
  15. 5
      test/core/iomgr/endpoint_tests.cc
  16. 8
      test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm
  17. 4
      test/core/iomgr/tcp_posix_test.cc
  18. 4
      test/core/transport/chttp2/graceful_shutdown_test.cc
  19. 4
      test/core/transport/chttp2/streams_not_seen_test.cc
  20. 2
      test/core/util/mock_endpoint.cc
  21. 2
      test/core/util/passthru_endpoint.cc
  22. 2
      test/cpp/microbenchmarks/bm_chttp2_transport.cc

@ -987,7 +987,7 @@ static void write_action(void* gt, grpc_error_handle /*error*/) {
t->ep, &t->outbuf,
GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t,
grpc_schedule_on_exec_ctx),
cl);
cl, /*max_frame_size=*/INT_MAX);
}
static void write_action_end(void* tp, grpc_error_handle error) {

@ -20,6 +20,7 @@
#include "src/core/lib/http/httpcli.h"
#include <limits.h>
#include <string.h>
#include <string>
@ -281,7 +282,8 @@ void HttpRequest::StartWrite() {
grpc_slice_ref_internal(request_text_);
grpc_slice_buffer_add(&outgoing_, request_text_);
Ref().release(); // ref held by pending write
grpc_endpoint_write(ep_, &outgoing_, &done_write_, nullptr);
grpc_endpoint_write(ep_, &outgoing_, &done_write_, nullptr,
/*max_frame_size=*/INT_MAX);
}
void HttpRequest::OnHandshakeDone(void* arg, grpc_error_handle error) {

@ -28,8 +28,8 @@ void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* arg) {
ep->vtable->write(ep, slices, cb, arg);
grpc_closure* cb, void* arg, int max_frame_size) {
ep->vtable->write(ep, slices, cb, arg, max_frame_size);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {

@ -40,7 +40,7 @@ struct grpc_endpoint_vtable {
void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
bool urgent, int min_progress_size);
void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
void* arg);
void* arg, int max_frame_size);
void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
@ -79,9 +79,11 @@ int grpc_endpoint_get_fd(grpc_endpoint* ep);
it is a valid slice buffer.
\a arg is platform specific. It is currently only used by TCP on linux
platforms as an argument that would be forwarded to the timestamps callback.
\a max_frame_size. A hint to the endpoint implementation to construct
frames which do not exceed the specified size.
*/
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* arg);
grpc_closure* cb, void* arg, int max_frame_size);
/* Causes any pending and future read/write callbacks to run immediately with
success==0 */

@ -255,7 +255,7 @@ static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* arg) {
grpc_closure* cb, void* arg, int /*max_frame_size*/) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",

@ -64,7 +64,7 @@ void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* arg) {
grpc_closure* cb, void* arg, int /*max_frame_size*/) {
// TODO(hork): adapt arg to some metrics collection mechanism.
(void)arg;
auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);

@ -1575,7 +1575,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */,
}
static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
grpc_closure* cb, void* arg) {
grpc_closure* cb, void* arg, int /*max_frame_size*/) {
GPR_TIMER_SCOPE("tcp_write", 0);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_error_handle error = GRPC_ERROR_NONE;

@ -344,7 +344,7 @@ static void on_write(void* tcpp, grpc_error_handle error) {
/* Initiates a write. */
static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* arg) {
grpc_closure* cb, void* arg, int /*max_frame_size*/) {
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_winsocket* socket = tcp->socket;
grpc_winsocket_callback_info* info = &socket->write_info;

@ -20,6 +20,8 @@
#include "src/core/lib/security/transport/secure_endpoint.h"
#include <limits.h>
#include <new>
#include <grpc/slice.h>
@ -349,7 +351,8 @@ static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
}
static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* arg) {
grpc_closure* cb, void* arg,
int /*max_frame_size*/) {
GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0);
unsigned i;
@ -442,7 +445,8 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
return;
}
grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg);
grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg,
/*max_frame_size=*/INT_MAX);
}
static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error_handle why) {

@ -393,7 +393,7 @@ grpc_error_handle SecurityHandshaker::OnHandshakeNextDoneLocked(
&on_handshake_data_sent_to_peer_,
&SecurityHandshaker::OnHandshakeDataSentToPeerFnScheduler, this,
grpc_schedule_on_exec_ctx),
nullptr);
nullptr, /*max_frame_size=*/INT_MAX);
} else if (handshaker_result == nullptr) {
// There is nothing to send, but need to read from peer.
grpc_endpoint_read(

@ -20,6 +20,7 @@
#include "src/core/lib/transport/http_connect_handshaker.h"
#include <limits.h>
#include <string.h>
#include "absl/strings/str_cat.h"
@ -352,7 +353,7 @@ void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
GRPC_CLOSURE_INIT(&request_done_closure_,
&HttpConnectHandshaker::OnWriteDoneScheduler, this,
grpc_schedule_on_exec_ctx),
nullptr);
nullptr, /*max_frame_size=*/INT_MAX);
}
HttpConnectHandshaker::HttpConnectHandshaker() {

@ -18,6 +18,7 @@
#include "test/core/bad_client/bad_client.h"
#include <limits.h>
#include <stdio.h>
#include <grpc/support/alloc.h>
@ -120,7 +121,8 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags,
grpc_schedule_on_exec_ctx);
/* Write data */
grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure, nullptr);
grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure, nullptr,
/*max_frame_size=*/INT_MAX);
grpc_core::ExecCtx::Get()->Flush();
/* Await completion, unless the request is large and write may not finish

@ -16,6 +16,7 @@
*
*/
#include <limits.h>
#include <string.h>
#include <grpc/grpc.h>
@ -108,7 +109,8 @@ static void handle_write() {
grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer);
grpc_slice_buffer_add(&state.outgoing_buffer, slice);
grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr);
grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr,
/*max_frame_size=*/INT_MAX);
}
static void handle_read(void* /*arg*/, grpc_error_handle error) {
@ -163,7 +165,8 @@ static void on_connect(void* arg, grpc_endpoint* tcp,
HTTP2_SETTINGS_FRAME, sizeof(HTTP2_SETTINGS_FRAME) - 1);
grpc_slice_buffer_add(&state.outgoing_buffer, slice);
grpc_endpoint_write(state.tcp, &state.outgoing_buffer,
&on_writing_settings_frame, nullptr);
&on_writing_settings_frame, nullptr,
/*max_frame_size=*/INT_MAX);
} else {
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
/*urgent=*/false, /*min_progress_size=*/1);

@ -18,6 +18,7 @@
#include "test/core/end2end/fixtures/http_proxy_fixture.h"
#include <limits.h>
#include <string.h>
#include "absl/strings/str_cat.h"
@ -227,7 +228,8 @@ static void on_client_write_done_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_client_write_done, nullptr);
&conn->on_client_write_done, nullptr,
/*max_frame_size=*/INT_MAX);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "write_done");
@ -262,7 +264,8 @@ static void on_server_write_done_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
&conn->on_server_write_done, nullptr);
&conn->on_server_write_done, nullptr,
/*max_frame_size=*/INT_MAX);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "server_write");
@ -303,7 +306,8 @@ static void on_client_read_done_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
&conn->on_server_write_done, nullptr);
&conn->on_server_write_done, nullptr,
/*max_frame_size=*/INT_MAX);
}
// Read more data.
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
@ -346,7 +350,8 @@ static void on_server_read_done_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_client_write_done, nullptr);
&conn->on_client_write_done, nullptr,
/*max_frame_size=*/INT_MAX);
}
// Read more data.
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
@ -424,7 +429,8 @@ static void on_server_connect_done_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&conn->on_write_response_done, on_write_response_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_write_response_done, nullptr);
&conn->on_write_response_done, nullptr,
/*max_frame_size=*/INT_MAX);
}
static void on_server_connect_done(void* arg, grpc_error_handle error) {

@ -18,6 +18,7 @@
#include "test/core/iomgr/endpoint_tests.h"
#include <limits.h>
#include <stdbool.h>
#include <sys/types.h>
@ -152,7 +153,7 @@ static void write_scheduler(void* data, grpc_error_handle /* error */) {
struct read_and_write_test_state* state =
static_cast<struct read_and_write_test_state*>(data);
grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
nullptr);
nullptr, /*max_frame_size=*/INT_MAX);
}
static void read_and_write_test_write_handler(void* data,
@ -327,7 +328,7 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_endpoint_write(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx),
nullptr);
nullptr, /*max_frame_size=*/INT_MAX);
wait_for_fail_count(&fail_count, 3);
grpc_endpoint_shutdown(f.client_ep,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));

@ -22,6 +22,8 @@
#ifdef GRPC_CFSTREAM
#include <limits.h>
#include <netinet/in.h>
#include <grpc/grpc.h>
@ -173,7 +175,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
grpc_slice_buffer_add(&write_slices, slice);
init_event_closure(&write_done, &write_promise);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
std::future<grpc_error_handle> write_future = write_promise.get_future();
XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
@ -235,7 +237,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
grpc_slice_buffer_add(&write_slices, slice);
init_event_closure(&write_done, &write_promise);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
std::future<grpc_error_handle> write_future = write_promise.get_future();
XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
@ -288,7 +290,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
grpc_slice_buffer_add(&write_slices, slice);
init_event_closure(&write_done, &write_promise);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
std::future<grpc_error_handle> write_future = write_promise.get_future();
XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);

@ -24,6 +24,7 @@
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
@ -467,7 +468,8 @@ static void write_test(size_t num_bytes, size_t slice_size,
grpc_endpoint_write(ep, &outgoing, &write_done_closure,
grpc_event_engine_can_track_errors() && collect_timestamps
? &done_timestamps
: nullptr);
: nullptr,
/*max_frame_size=*/INT_MAX);
drain_socket_blocking(sv[0], num_bytes, num_bytes);
exec_ctx.Flush();
gpr_mu_lock(g_mu);

@ -18,6 +18,7 @@
#include <grpc/support/port_platform.h>
#include <limits.h>
#include <stdlib.h>
#include <string.h>
@ -212,7 +213,8 @@ class GracefulShutdownTest : public ::testing::Test {
absl::Notification on_write_done_notification_;
GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone,
&on_write_done_notification_, nullptr);
grpc_endpoint_write(fds_.client, buffer, &on_write_done_, nullptr);
grpc_endpoint_write(fds_.client, buffer, &on_write_done_, nullptr,
/*max_frame_size=*/INT_MAX);
ExecCtx::Get()->Flush();
GPR_ASSERT(on_write_done_notification_.WaitForNotificationWithTimeout(
absl::Seconds(5)));

@ -18,6 +18,7 @@
#include <grpc/support/port_platform.h>
#include <limits.h>
#include <stdlib.h>
#include <string.h>
@ -315,7 +316,8 @@ class StreamsNotSeenTest : public ::testing::Test {
absl::Notification on_write_done_notification_;
GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone,
&on_write_done_notification_, nullptr);
grpc_endpoint_write(tcp_, buffer, &on_write_done_, nullptr);
grpc_endpoint_write(tcp_, buffer, &on_write_done_, nullptr,
/*max_frame_size=*/INT_MAX);
ExecCtx::Get()->Flush();
GPR_ASSERT(on_write_done_notification_.WaitForNotificationWithTimeout(
absl::Seconds(5)));

@ -54,7 +54,7 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* /*arg*/) {
grpc_closure* cb, void* /*arg*/, int /*max_frame_size*/) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
for (size_t i = 0; i < slices->count; i++) {
m->on_write(slices->slices[i]);

@ -259,7 +259,7 @@ static void do_pending_write_op_locked(half* m, grpc_error_handle error) {
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* /*arg*/) {
grpc_closure* cb, void* /*arg*/, int /*max_frame_size*/) {
half* m = reinterpret_cast<half*>(ep);
gpr_mu_lock(&m->parent->mu);
gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);

@ -102,7 +102,7 @@ class PhonyEndpoint : public grpc_endpoint {
}
static void write(grpc_endpoint* /*ep*/, grpc_slice_buffer* /*slices*/,
grpc_closure* cb, void* /*arg*/) {
grpc_closure* cb, void* /*arg*/, int /*max_frame_size*/) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
}

Loading…
Cancel
Save