From 04c97d0e0daec7c47d0ee5fcb8038270dd2d3328 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 9 Nov 2017 09:14:14 -0800 Subject: [PATCH] Add notify_on_receive_settings closure to chttp2 transport. --- .../chttp2/client/chttp2_connector.cc | 26 ++++++++++++++++++- .../client/insecure/channel_create_posix.cc | 2 +- .../transport/chttp2/server/chttp2_server.cc | 4 ++- .../server/insecure/server_chttp2_posix.cc | 2 +- .../chttp2/transport/chttp2_transport.cc | 19 +++++++++----- .../chttp2/transport/chttp2_transport.h | 8 +++--- .../chttp2/transport/frame_settings.cc | 5 ++++ .../ext/transport/chttp2/transport/internal.h | 2 ++ test/core/bad_client/bad_client.cc | 2 +- .../end2end/fixtures/h2_sockpair+trace.cc | 4 +-- test/core/end2end/fixtures/h2_sockpair.cc | 4 +-- .../end2end/fixtures/h2_sockpair_1byte.cc | 4 +-- test/core/end2end/fuzzers/api_fuzzer.cc | 2 +- test/core/end2end/fuzzers/client_fuzzer.cc | 2 +- test/core/end2end/fuzzers/server_fuzzer.cc | 2 +- .../microbenchmarks/bm_chttp2_transport.cc | 2 +- test/cpp/microbenchmarks/fullstack_fixtures.h | 6 +++-- test/cpp/performance/writes_per_rpc_test.cc | 6 +++-- 18 files changed, 73 insertions(+), 29 deletions(-) diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 4efd1293847..5870a3e6d55 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -120,8 +120,32 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg, c->result->transport = grpc_create_chttp2_transport( exec_ctx, args->args, args->endpoint, true); GPR_ASSERT(c->result->transport); + // TODO(roth): We ideally want to wait until we receive HTTP/2 + // settings from the server before we consider the connection + // established. If that doesn't happen before the connection + // timeout expires, then we should consider the connection attempt a + // failure and feed that information back into the backoff code. + // We could pass a notify_on_receive_settings callback to + // grpc_chttp2_transport_start_reading() to let us know when + // settings are received, but we would need to figure out how to use + // that information here. + // + // Unfortunately, we don't currently have a way to split apart the two + // effects of scheduling c->notify: we start sending RPCs immediately + // (which we want to do) and we consider the connection attempt successful + // (which we don't want to do until we get the notify_on_receive_settings + // callback from the transport). If we could split those things + // apart, then we could start sending RPCs but then wait for our + // timeout before deciding if the connection attempt is successful. + // If the attempt is not successful, then we would tear down the + // transport and feed the failure back into the backoff code. + // + // In addition, even if we did that, we would probably not want to do + // so until after transparent retries is implemented. Otherwise, any + // RPC that we attempt to send on the connection before the timeout + // would fail instead of being retried on a subsequent attempt. grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, - args->read_buffer); + args->read_buffer, nullptr); c->result->channel_args = args->args; } grpc_closure* notify = c->notify; diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index fcc2f4249aa..ad64f740b85 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -58,7 +58,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd( grpc_channel* channel = grpc_channel_create( &exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); grpc_channel_args_destroy(&exec_ctx, final_args); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 1b4d89b5ee6..39e8dfd684d 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -93,8 +93,10 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_server_setup_transport( exec_ctx, connection_state->svr_state->server, transport, connection_state->accepting_pollset, args->args); +// FIXME: set notify_on_receive_settings callback and use it to enforce +// handshaking deadline grpc_chttp2_transport_start_reading(exec_ctx, transport, - args->read_buffer); + args->read_buffer, nullptr); grpc_channel_args_destroy(exec_ctx, args->args); } } diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index 70d48647100..09ee14c0228 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -61,7 +61,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, } grpc_server_setup_transport(&exec_ctx, server, transport, NULL, server_args); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index b4edb17cdec..21808115d70 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1788,7 +1788,6 @@ static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx, grpc_transport_op* op = (grpc_transport_op*)stream_op; grpc_chttp2_transport* t = (grpc_chttp2_transport*)op->handler_private.extra_arg; - grpc_error* close_transport = op->disconnect_with_error; if (op->goaway_error) { send_goaway(exec_ctx, t, op->goaway_error); @@ -1820,8 +1819,13 @@ static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx, op->on_connectivity_state_change); } - if (close_transport != GRPC_ERROR_NONE) { - close_transport_locked(exec_ctx, t, close_transport); + if (op->disconnect_with_error != GRPC_ERROR_NONE) { + close_transport_locked(exec_ctx, t, op->disconnect_with_error); + if (t->notify_on_receive_settings != nullptr) { + GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings, + GRPC_ERROR_CANCELLED); + t->notify_on_receive_settings = nullptr; + } } GRPC_CLOSURE_RUN(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); @@ -3231,15 +3235,16 @@ grpc_transport* grpc_create_chttp2_transport( return &t->base; } -void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx, - grpc_transport* transport, - grpc_slice_buffer* read_buffer) { +void grpc_chttp2_transport_start_reading( + grpc_exec_ctx* exec_ctx, grpc_transport* transport, + grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)transport; GRPC_CHTTP2_REF_TRANSPORT( t, "reading_action"); /* matches unref inside reading_action */ - if (read_buffer != NULL) { + if (read_buffer != nullptr) { grpc_slice_buffer_move_into(read_buffer, &t->read_buffer); gpr_free(read_buffer); } + t->notify_on_receive_settings = notify_on_receive_settings; GRPC_CLOSURE_SCHED(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index 4fe12d42e9b..a349e00498c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -41,9 +41,11 @@ grpc_transport* grpc_create_chttp2_transport( /// Takes ownership of \a read_buffer, which (if non-NULL) contains /// leftover bytes previously read from the endpoint (e.g., by handshakers). -void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx, - grpc_transport* transport, - grpc_slice_buffer* read_buffer); +/// If non-null, \a notify_on_receive_settings will be scheduled when +/// HTTP/2 settings are received from the peer. +void grpc_chttp2_transport_start_reading( + grpc_exec_ctx* exec_ctx, grpc_transport* transport, + grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings); #ifdef __cplusplus } diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc index d33da721a52..a1da6075169 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.cc +++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc @@ -131,6 +131,11 @@ grpc_error* grpc_chttp2_settings_parser_parse(grpc_exec_ctx* exec_ctx, void* p, memcpy(parser->target_settings, parser->incoming_settings, GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); + if (t->notify_on_receive_settings != nullptr) { + GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings, + GRPC_ERROR_NONE); + t->notify_on_receive_settings = nullptr; + } } return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index a5a0a804a28..b4fe6fdcbe7 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -245,6 +245,8 @@ struct grpc_chttp2_transport { grpc_combiner* combiner; + grpc_closure* notify_on_receive_settings; + /** write execution state of the transport */ grpc_chttp2_write_state write_state; /** is this the first write in a series of writes? diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc index 5ab5436d2f1..4d594d79518 100644 --- a/test/core/bad_client/bad_client.cc +++ b/test/core/bad_client/bad_client.cc @@ -117,7 +117,7 @@ void grpc_run_bad_client_test( grpc_server_start(a.server); transport = grpc_create_chttp2_transport(&exec_ctx, NULL, sfd.server, false); server_setup_transport(&a, transport); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); /* Bind everything into the same pollset */ diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.cc b/test/core/end2end/fixtures/h2_sockpair+trace.cc index 8914af499b9..a1dea102250 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.cc +++ b/test/core/end2end/fixtures/h2_sockpair+trace.cc @@ -100,7 +100,7 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true); client_setup_transport(&exec_ctx, &cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } @@ -116,7 +116,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, transport = grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr. nullptr); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/end2end/fixtures/h2_sockpair.cc b/test/core/end2end/fixtures/h2_sockpair.cc index b79c8e7a25e..b2d946c88a3 100644 --- a/test/core/end2end/fixtures/h2_sockpair.cc +++ b/test/core/end2end/fixtures/h2_sockpair.cc @@ -94,7 +94,7 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true); client_setup_transport(&exec_ctx, &cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } @@ -110,7 +110,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, transport = grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.cc b/test/core/end2end/fixtures/h2_sockpair_1byte.cc index 529866b3a76..4e847a1d160 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.cc +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.cc @@ -105,7 +105,7 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true); client_setup_transport(&exec_ctx, &cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } @@ -121,7 +121,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, transport = grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index d625ec8b0d9..296b0a335f6 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -466,7 +466,7 @@ static void do_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_transport* transport = grpc_create_chttp2_transport(exec_ctx, NULL, server, false); grpc_server_setup_transport(exec_ctx, g_server, transport, NULL, NULL); - grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(exec_ctx, transport, nullptr, nullptr); GRPC_CLOSURE_SCHED(exec_ctx, fc->closure, GRPC_ERROR_NONE); } else { diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc index f61067b2776..1046d081394 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.cc +++ b/test/core/end2end/fuzzers/client_fuzzer.cc @@ -55,7 +55,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_completion_queue* cq = grpc_completion_queue_create_for_next(NULL); grpc_transport* transport = grpc_create_chttp2_transport(&exec_ctx, NULL, mock_endpoint, true); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr. nullptr); grpc_channel* channel = grpc_channel_create( &exec_ctx, "test-target", NULL, GRPC_CLIENT_DIRECT_CHANNEL, transport); diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index 4754712ad06..fad5df12461 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -63,7 +63,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_transport* transport = grpc_create_chttp2_transport(&exec_ctx, NULL, mock_endpoint, false); grpc_server_setup_transport(&exec_ctx, server, transport, NULL, NULL); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_call* call1 = NULL; grpc_call_details call_details1; diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 154cc917785..d5d423c8e89 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -137,7 +137,7 @@ class Fixture { grpc_channel_args c_args = args.c_channel_args(); ep_ = new DummyEndpoint; t_ = grpc_create_chttp2_transport(exec_ctx(), &c_args, ep_, client); - grpc_chttp2_transport_start_reading(exec_ctx(), t_, NULL); + grpc_chttp2_transport_start_reading(exec_ctx(), t_, nullptr, nullptr); FlushExecCtx(); } diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index 7db23234b69..d59b00b048c 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -186,7 +186,8 @@ class EndpointPairFixture : public BaseFixture { grpc_server_setup_transport(&exec_ctx, server_->c_server(), server_transport_, NULL, server_args); - grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_, + nullptr, nullptr); } /* create channel */ @@ -202,7 +203,8 @@ class EndpointPairFixture : public BaseFixture { grpc_channel* channel = grpc_channel_create(&exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport_); - grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_, + nullptr, nullptr); channel_ = CreateChannelInternal("", channel); } diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc index ecf67a2c273..1d8b30f0ad6 100644 --- a/test/cpp/performance/writes_per_rpc_test.cc +++ b/test/cpp/performance/writes_per_rpc_test.cc @@ -101,7 +101,8 @@ class EndpointPairFixture { grpc_server_setup_transport(&exec_ctx, server_->c_server(), transport, NULL, server_args); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, + nullptr); } /* create channel */ @@ -116,7 +117,8 @@ class EndpointPairFixture { GPR_ASSERT(transport); grpc_channel* channel = grpc_channel_create( &exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, + nullptr); channel_ = CreateChannelInternal("", channel); }