diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 37330222543..44e70a1662c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -214,6 +214,12 @@ static void finish_keepalive_ping_locked( grpc_core::RefCountedPtr t, grpc_error_handle error); static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t); +static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error, + bool immediate_disconnect_hint); + +// Timeout for getting an ack back on settings changes +#define GRPC_ARG_SETTINGS_TIMEOUT "grpc.http2.settings_timeout" + namespace { grpc_core::CallTracerInterface* CallTracerIfEnabled(grpc_chttp2_stream* s) { if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) { @@ -409,6 +415,11 @@ static void read_channel_args(grpc_chttp2_transport* t, : false); } + t->settings_timeout = + channel_args.GetDurationFromIntMillis(GRPC_ARG_SETTINGS_TIMEOUT) + .value_or(std::max(t->keepalive_timeout * 2, + grpc_core::Duration::Minutes(1))); + // Only send the prefered rx frame size http2 setting if we are instructed // to auto size the buffers allocated at tcp level and we also can adjust // sending frame size. @@ -705,6 +716,12 @@ static void close_transport_locked(grpc_chttp2_transport* t, t->closed_with_error = error; connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(), "close_transport"); + if (t->settings_ack_watchdog != + grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) { + t->event_engine->Cancel(std::exchange( + t->settings_ack_watchdog, + grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid)); + } if (t->delayed_ping_timer_handle.has_value()) { if (t->event_engine->Cancel(*t->delayed_ping_timer_handle)) { t->delayed_ping_timer_handle.reset(); @@ -1727,6 +1744,12 @@ void grpc_chttp2_ping_timeout( grpc_core::NewClosure([t](grpc_error_handle) { gpr_log(GPR_INFO, "%s: Ping timeout. Closing transport.", std::string(t->peer_string.as_string_view()).c_str()); + send_goaway( + t.get(), + grpc_error_set_int(GRPC_ERROR_CREATE("ping_timeout"), + grpc_core::StatusIntProperty::kHttp2Error, + GRPC_HTTP2_ENHANCE_YOUR_CALM), + /*immediate_disconnect_hint=*/true); close_transport_locked( t.get(), grpc_error_set_int(GRPC_ERROR_CREATE("ping timeout"), @@ -1736,6 +1759,27 @@ void grpc_chttp2_ping_timeout( absl::OkStatus()); } +void grpc_chttp2_settings_timeout( + grpc_core::RefCountedPtr t) { + t->combiner->Run( + grpc_core::NewClosure([t](grpc_error_handle) { + gpr_log(GPR_INFO, "%s: Settings timeout. Closing transport.", + std::string(t->peer_string.as_string_view()).c_str()); + send_goaway( + t.get(), + grpc_error_set_int(GRPC_ERROR_CREATE("settings_timeout"), + grpc_core::StatusIntProperty::kHttp2Error, + GRPC_HTTP2_SETTINGS_TIMEOUT), + /*immediate_disconnect_hint=*/true); + close_transport_locked( + t.get(), + grpc_error_set_int(GRPC_ERROR_CREATE("settings timeout"), + grpc_core::StatusIntProperty::kRpcStatus, + GRPC_STATUS_UNAVAILABLE)); + }), + absl::OkStatus()); +} + namespace { // Fire and forget (deletes itself on completion). Does a graceful shutdown by diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 4448e8b98f1..00706abf776 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -324,6 +324,10 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { /// settings values uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; + grpc_event_engine::experimental::EventEngine::TaskHandle + settings_ack_watchdog = + grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid; + /// what is the next stream id to be allocated by this peer? /// copied to next_stream_id in parsing when parsing commences uint32_t next_stream_id = 0; @@ -439,6 +443,9 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { uint32_t num_pending_induced_frames = 0; uint32_t incoming_stream_id = 0; + /// grace period before settings timeout expires + grpc_core::Duration settings_timeout; + /// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? /// uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow; @@ -726,6 +733,9 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t, void grpc_chttp2_ping_timeout( grpc_core::RefCountedPtr t); +void grpc_chttp2_settings_timeout( + grpc_core::RefCountedPtr t); + #define GRPC_HEADER_SIZE_IN_BYTES 5 #define MAX_SIZE_T (~(size_t)0) diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 010a65c61f6..3e87c443ffd 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -23,7 +23,9 @@ #include #include +#include #include +#include #include "absl/base/attributes.h" #include "absl/container/flat_hash_map.h" @@ -34,6 +36,7 @@ #include "absl/strings/string_view.h" #include "absl/types/variant.h" +#include #include #include #include @@ -836,6 +839,12 @@ static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t) { t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), t, nullptr); + if (t->settings_ack_watchdog != + grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) { + t->event_engine->Cancel(std::exchange( + t->settings_ack_watchdog, + grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid)); + } t->sent_local_settings = false; // This is more streams than can be started in http2, so setting this // effictively removes the limit for the rest of the connection. diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index e6406880745..def1b7ef0a2 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -262,6 +262,20 @@ class WriteContext { t_->settings[GRPC_LOCAL_SETTINGS], t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); + if (grpc_core::IsSettingsTimeoutEnabled() && + t_->keepalive_timeout != grpc_core::Duration::Infinity()) { + GPR_ASSERT( + t_->settings_ack_watchdog == + grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid); + // We base settings timeout on keepalive timeout, but double it to allow + // for implementations taking some more time about acking a setting. + t_->settings_ack_watchdog = t_->event_engine->RunAfter( + t_->settings_timeout, [t = t_->Ref()]() mutable { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + grpc_chttp2_settings_timeout(std::move(t)); + }); + } t_->force_send_settings = false; t_->dirtied_local_settings = false; t_->sent_local_settings = true; diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index e8633c6ab85..c2f9ab002a9 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -130,6 +130,9 @@ const char* const description_call_status_override_on_cancellation = "with cancellation."; const char* const additional_constraints_call_status_override_on_cancellation = "{}"; +const char* const description_settings_timeout = + "If set, use the settings timeout to send settings frame to the peer."; +const char* const additional_constraints_settings_timeout = "{}"; const char* const description_work_serializer_clears_time_cache = "Have the work serializer clear the time cache when it dispatches work."; const char* const additional_constraints_work_serializer_clears_time_cache = @@ -224,6 +227,8 @@ const ExperimentMetadata g_experiment_metadata[] = { description_call_status_override_on_cancellation, additional_constraints_call_status_override_on_cancellation, kDefaultForDebugOnly, true}, + {"settings_timeout", description_settings_timeout, + additional_constraints_settings_timeout, true, true}, {"work_serializer_clears_time_cache", description_work_serializer_clears_time_cache, additional_constraints_work_serializer_clears_time_cache, true, true}, @@ -351,6 +356,9 @@ const char* const description_call_status_override_on_cancellation = "with cancellation."; const char* const additional_constraints_call_status_override_on_cancellation = "{}"; +const char* const description_settings_timeout = + "If set, use the settings timeout to send settings frame to the peer."; +const char* const additional_constraints_settings_timeout = "{}"; const char* const description_work_serializer_clears_time_cache = "Have the work serializer clear the time cache when it dispatches work."; const char* const additional_constraints_work_serializer_clears_time_cache = @@ -445,6 +453,8 @@ const ExperimentMetadata g_experiment_metadata[] = { description_call_status_override_on_cancellation, additional_constraints_call_status_override_on_cancellation, kDefaultForDebugOnly, true}, + {"settings_timeout", description_settings_timeout, + additional_constraints_settings_timeout, true, true}, {"work_serializer_clears_time_cache", description_work_serializer_clears_time_cache, additional_constraints_work_serializer_clears_time_cache, true, true}, @@ -572,6 +582,9 @@ const char* const description_call_status_override_on_cancellation = "with cancellation."; const char* const additional_constraints_call_status_override_on_cancellation = "{}"; +const char* const description_settings_timeout = + "If set, use the settings timeout to send settings frame to the peer."; +const char* const additional_constraints_settings_timeout = "{}"; const char* const description_work_serializer_clears_time_cache = "Have the work serializer clear the time cache when it dispatches work."; const char* const additional_constraints_work_serializer_clears_time_cache = @@ -666,6 +679,8 @@ const ExperimentMetadata g_experiment_metadata[] = { description_call_status_override_on_cancellation, additional_constraints_call_status_override_on_cancellation, kDefaultForDebugOnly, true}, + {"settings_timeout", description_settings_timeout, + additional_constraints_settings_timeout, true, true}, {"work_serializer_clears_time_cache", description_work_serializer_clears_time_cache, additional_constraints_work_serializer_clears_time_cache, true, true}, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 3f2f865a516..4f4befd57c5 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -103,6 +103,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; #endif } +#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT +inline bool IsSettingsTimeoutEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_CHTTP2_BATCH_REQUESTS @@ -161,6 +163,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; #endif } +#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT +inline bool IsSettingsTimeoutEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_CHTTP2_BATCH_REQUESTS @@ -219,6 +223,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; #endif } +#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT +inline bool IsSettingsTimeoutEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_CHTTP2_BATCH_REQUESTS @@ -262,6 +268,7 @@ enum ExperimentIds { kExperimentIdMultiping, kExperimentIdRegisteredMethodLookupInTransport, kExperimentIdCallStatusOverrideOnCancellation, + kExperimentIdSettingsTimeout, kExperimentIdWorkSerializerClearsTimeCache, kExperimentIdChttp2BatchRequests, kExperimentIdChttp2OffloadOnRstStream, @@ -385,6 +392,10 @@ inline bool IsRegisteredMethodLookupInTransportEnabled() { inline bool IsCallStatusOverrideOnCancellationEnabled() { return IsExperimentEnabled(kExperimentIdCallStatusOverrideOnCancellation); } +#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT +inline bool IsSettingsTimeoutEnabled() { + return IsExperimentEnabled(kExperimentIdSettingsTimeout); +} #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return IsExperimentEnabled(kExperimentIdWorkSerializerClearsTimeCache); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 4686e0fb33e..3f801b94fd8 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -222,6 +222,12 @@ expiry: 2024/01/01 owner: vigneshbabu@google.com test_tags: [] +- name: settings_timeout + description: + If set, use the settings timeout to send settings frame to the peer. + expiry: 2024/03/03 + owner: ctiller@google.com + test_tags: [] - name: work_serializer_clears_time_cache description: Have the work serializer clear the time cache when it dispatches work. diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index faa8267dc21..c34edfc9ed5 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -106,6 +106,8 @@ default: debug - name: work_serializer_clears_time_cache default: true +- name: settings_timeout + default: true - name: chttp2_batch_requests default: true - name: chttp2_offload_on_rst_stream