[chttp2] Enforce settings acks (#34640)

Previously our settings changes carried no timeout, fix that.

Default timeout starts at keepalive_timeout*2.
pull/34648/head^2
Craig Tiller 1 year ago committed by GitHub
parent c38837a741
commit 385583b28a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 10
      src/core/ext/transport/chttp2/transport/internal.h
  3. 9
      src/core/ext/transport/chttp2/transport/parsing.cc
  4. 14
      src/core/ext/transport/chttp2/transport/writing.cc
  5. 15
      src/core/lib/experiments/experiments.cc
  6. 11
      src/core/lib/experiments/experiments.h
  7. 6
      src/core/lib/experiments/experiments.yaml
  8. 2
      src/core/lib/experiments/rollouts.yaml

@ -214,6 +214,12 @@ static void finish_keepalive_ping_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error); grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t); static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);
static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
bool immediate_disconnect_hint);
// Timeout for getting an ack back on settings changes
#define GRPC_ARG_SETTINGS_TIMEOUT "grpc.http2.settings_timeout"
namespace { namespace {
grpc_core::CallTracerInterface* CallTracerIfEnabled(grpc_chttp2_stream* s) { grpc_core::CallTracerInterface* CallTracerIfEnabled(grpc_chttp2_stream* s) {
if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) { if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) {
@ -409,6 +415,11 @@ static void read_channel_args(grpc_chttp2_transport* t,
: false); : false);
} }
t->settings_timeout =
channel_args.GetDurationFromIntMillis(GRPC_ARG_SETTINGS_TIMEOUT)
.value_or(std::max(t->keepalive_timeout * 2,
grpc_core::Duration::Minutes(1)));
// Only send the prefered rx frame size http2 setting if we are instructed // Only send the prefered rx frame size http2 setting if we are instructed
// to auto size the buffers allocated at tcp level and we also can adjust // to auto size the buffers allocated at tcp level and we also can adjust
// sending frame size. // sending frame size.
@ -705,6 +716,12 @@ static void close_transport_locked(grpc_chttp2_transport* t,
t->closed_with_error = error; t->closed_with_error = error;
connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(), connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
"close_transport"); "close_transport");
if (t->settings_ack_watchdog !=
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
t->event_engine->Cancel(std::exchange(
t->settings_ack_watchdog,
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid));
}
if (t->delayed_ping_timer_handle.has_value()) { if (t->delayed_ping_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->delayed_ping_timer_handle)) { if (t->event_engine->Cancel(*t->delayed_ping_timer_handle)) {
t->delayed_ping_timer_handle.reset(); t->delayed_ping_timer_handle.reset();
@ -1727,6 +1744,12 @@ void grpc_chttp2_ping_timeout(
grpc_core::NewClosure([t](grpc_error_handle) { grpc_core::NewClosure([t](grpc_error_handle) {
gpr_log(GPR_INFO, "%s: Ping timeout. Closing transport.", gpr_log(GPR_INFO, "%s: Ping timeout. Closing transport.",
std::string(t->peer_string.as_string_view()).c_str()); std::string(t->peer_string.as_string_view()).c_str());
send_goaway(
t.get(),
grpc_error_set_int(GRPC_ERROR_CREATE("ping_timeout"),
grpc_core::StatusIntProperty::kHttp2Error,
GRPC_HTTP2_ENHANCE_YOUR_CALM),
/*immediate_disconnect_hint=*/true);
close_transport_locked( close_transport_locked(
t.get(), t.get(),
grpc_error_set_int(GRPC_ERROR_CREATE("ping timeout"), grpc_error_set_int(GRPC_ERROR_CREATE("ping timeout"),
@ -1736,6 +1759,27 @@ void grpc_chttp2_ping_timeout(
absl::OkStatus()); absl::OkStatus());
} }
void grpc_chttp2_settings_timeout(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
t->combiner->Run(
grpc_core::NewClosure([t](grpc_error_handle) {
gpr_log(GPR_INFO, "%s: Settings timeout. Closing transport.",
std::string(t->peer_string.as_string_view()).c_str());
send_goaway(
t.get(),
grpc_error_set_int(GRPC_ERROR_CREATE("settings_timeout"),
grpc_core::StatusIntProperty::kHttp2Error,
GRPC_HTTP2_SETTINGS_TIMEOUT),
/*immediate_disconnect_hint=*/true);
close_transport_locked(
t.get(),
grpc_error_set_int(GRPC_ERROR_CREATE("settings timeout"),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE));
}),
absl::OkStatus());
}
namespace { namespace {
// Fire and forget (deletes itself on completion). Does a graceful shutdown by // Fire and forget (deletes itself on completion). Does a graceful shutdown by

@ -324,6 +324,10 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
/// settings values /// settings values
uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
grpc_event_engine::experimental::EventEngine::TaskHandle
settings_ack_watchdog =
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
/// what is the next stream id to be allocated by this peer? /// what is the next stream id to be allocated by this peer?
/// copied to next_stream_id in parsing when parsing commences /// copied to next_stream_id in parsing when parsing commences
uint32_t next_stream_id = 0; uint32_t next_stream_id = 0;
@ -439,6 +443,9 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
uint32_t num_pending_induced_frames = 0; uint32_t num_pending_induced_frames = 0;
uint32_t incoming_stream_id = 0; uint32_t incoming_stream_id = 0;
/// grace period before settings timeout expires
grpc_core::Duration settings_timeout;
/// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? /// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
/// ///
uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow; uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
@ -726,6 +733,9 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
void grpc_chttp2_ping_timeout( void grpc_chttp2_ping_timeout(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t); grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
void grpc_chttp2_settings_timeout(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
#define GRPC_HEADER_SIZE_IN_BYTES 5 #define GRPC_HEADER_SIZE_IN_BYTES 5
#define MAX_SIZE_T (~(size_t)0) #define MAX_SIZE_T (~(size_t)0)

@ -23,7 +23,9 @@
#include <initializer_list> #include <initializer_list>
#include <limits> #include <limits>
#include <memory>
#include <string> #include <string>
#include <utility>
#include "absl/base/attributes.h" #include "absl/base/attributes.h"
#include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_map.h"
@ -34,6 +36,7 @@
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/variant.h" #include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h> #include <grpc/slice.h>
#include <grpc/slice_buffer.h> #include <grpc/slice_buffer.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
@ -836,6 +839,12 @@ static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t) {
t->settings[GRPC_ACKED_SETTINGS] t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
t, nullptr); t, nullptr);
if (t->settings_ack_watchdog !=
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
t->event_engine->Cancel(std::exchange(
t->settings_ack_watchdog,
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid));
}
t->sent_local_settings = false; t->sent_local_settings = false;
// This is more streams than can be started in http2, so setting this // This is more streams than can be started in http2, so setting this
// effictively removes the limit for the rest of the connection. // effictively removes the limit for the rest of the connection.

@ -262,6 +262,20 @@ class WriteContext {
t_->settings[GRPC_LOCAL_SETTINGS], t_->settings[GRPC_LOCAL_SETTINGS],
t_->force_send_settings, t_->force_send_settings,
GRPC_CHTTP2_NUM_SETTINGS)); GRPC_CHTTP2_NUM_SETTINGS));
if (grpc_core::IsSettingsTimeoutEnabled() &&
t_->keepalive_timeout != grpc_core::Duration::Infinity()) {
GPR_ASSERT(
t_->settings_ack_watchdog ==
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid);
// We base settings timeout on keepalive timeout, but double it to allow
// for implementations taking some more time about acking a setting.
t_->settings_ack_watchdog = t_->event_engine->RunAfter(
t_->settings_timeout, [t = t_->Ref()]() mutable {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_chttp2_settings_timeout(std::move(t));
});
}
t_->force_send_settings = false; t_->force_send_settings = false;
t_->dirtied_local_settings = false; t_->dirtied_local_settings = false;
t_->sent_local_settings = true; t_->sent_local_settings = true;

@ -130,6 +130,9 @@ const char* const description_call_status_override_on_cancellation =
"with cancellation."; "with cancellation.";
const char* const additional_constraints_call_status_override_on_cancellation = const char* const additional_constraints_call_status_override_on_cancellation =
"{}"; "{}";
const char* const description_settings_timeout =
"If set, use the settings timeout to send settings frame to the peer.";
const char* const additional_constraints_settings_timeout = "{}";
const char* const description_work_serializer_clears_time_cache = const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work."; "Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache = const char* const additional_constraints_work_serializer_clears_time_cache =
@ -224,6 +227,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_call_status_override_on_cancellation, description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation, additional_constraints_call_status_override_on_cancellation,
kDefaultForDebugOnly, true}, kDefaultForDebugOnly, true},
{"settings_timeout", description_settings_timeout,
additional_constraints_settings_timeout, true, true},
{"work_serializer_clears_time_cache", {"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache, description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, true, true}, additional_constraints_work_serializer_clears_time_cache, true, true},
@ -351,6 +356,9 @@ const char* const description_call_status_override_on_cancellation =
"with cancellation."; "with cancellation.";
const char* const additional_constraints_call_status_override_on_cancellation = const char* const additional_constraints_call_status_override_on_cancellation =
"{}"; "{}";
const char* const description_settings_timeout =
"If set, use the settings timeout to send settings frame to the peer.";
const char* const additional_constraints_settings_timeout = "{}";
const char* const description_work_serializer_clears_time_cache = const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work."; "Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache = const char* const additional_constraints_work_serializer_clears_time_cache =
@ -445,6 +453,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_call_status_override_on_cancellation, description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation, additional_constraints_call_status_override_on_cancellation,
kDefaultForDebugOnly, true}, kDefaultForDebugOnly, true},
{"settings_timeout", description_settings_timeout,
additional_constraints_settings_timeout, true, true},
{"work_serializer_clears_time_cache", {"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache, description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, true, true}, additional_constraints_work_serializer_clears_time_cache, true, true},
@ -572,6 +582,9 @@ const char* const description_call_status_override_on_cancellation =
"with cancellation."; "with cancellation.";
const char* const additional_constraints_call_status_override_on_cancellation = const char* const additional_constraints_call_status_override_on_cancellation =
"{}"; "{}";
const char* const description_settings_timeout =
"If set, use the settings timeout to send settings frame to the peer.";
const char* const additional_constraints_settings_timeout = "{}";
const char* const description_work_serializer_clears_time_cache = const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work."; "Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache = const char* const additional_constraints_work_serializer_clears_time_cache =
@ -666,6 +679,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_call_status_override_on_cancellation, description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation, additional_constraints_call_status_override_on_cancellation,
kDefaultForDebugOnly, true}, kDefaultForDebugOnly, true},
{"settings_timeout", description_settings_timeout,
additional_constraints_settings_timeout, true, true},
{"work_serializer_clears_time_cache", {"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache, description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, true, true}, additional_constraints_work_serializer_clears_time_cache, true, true},

@ -103,6 +103,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() {
return true; return true;
#endif #endif
} }
#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT
inline bool IsSettingsTimeoutEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_CHTTP2_BATCH_REQUESTS #define GRPC_EXPERIMENT_IS_INCLUDED_CHTTP2_BATCH_REQUESTS
@ -161,6 +163,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() {
return true; return true;
#endif #endif
} }
#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT
inline bool IsSettingsTimeoutEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_CHTTP2_BATCH_REQUESTS #define GRPC_EXPERIMENT_IS_INCLUDED_CHTTP2_BATCH_REQUESTS
@ -219,6 +223,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() {
return true; return true;
#endif #endif
} }
#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT
inline bool IsSettingsTimeoutEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_CHTTP2_BATCH_REQUESTS #define GRPC_EXPERIMENT_IS_INCLUDED_CHTTP2_BATCH_REQUESTS
@ -262,6 +268,7 @@ enum ExperimentIds {
kExperimentIdMultiping, kExperimentIdMultiping,
kExperimentIdRegisteredMethodLookupInTransport, kExperimentIdRegisteredMethodLookupInTransport,
kExperimentIdCallStatusOverrideOnCancellation, kExperimentIdCallStatusOverrideOnCancellation,
kExperimentIdSettingsTimeout,
kExperimentIdWorkSerializerClearsTimeCache, kExperimentIdWorkSerializerClearsTimeCache,
kExperimentIdChttp2BatchRequests, kExperimentIdChttp2BatchRequests,
kExperimentIdChttp2OffloadOnRstStream, kExperimentIdChttp2OffloadOnRstStream,
@ -385,6 +392,10 @@ inline bool IsRegisteredMethodLookupInTransportEnabled() {
inline bool IsCallStatusOverrideOnCancellationEnabled() { inline bool IsCallStatusOverrideOnCancellationEnabled() {
return IsExperimentEnabled(kExperimentIdCallStatusOverrideOnCancellation); return IsExperimentEnabled(kExperimentIdCallStatusOverrideOnCancellation);
} }
#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT
inline bool IsSettingsTimeoutEnabled() {
return IsExperimentEnabled(kExperimentIdSettingsTimeout);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { inline bool IsWorkSerializerClearsTimeCacheEnabled() {
return IsExperimentEnabled(kExperimentIdWorkSerializerClearsTimeCache); return IsExperimentEnabled(kExperimentIdWorkSerializerClearsTimeCache);

@ -222,6 +222,12 @@
expiry: 2024/01/01 expiry: 2024/01/01
owner: vigneshbabu@google.com owner: vigneshbabu@google.com
test_tags: [] test_tags: []
- name: settings_timeout
description:
If set, use the settings timeout to send settings frame to the peer.
expiry: 2024/03/03
owner: ctiller@google.com
test_tags: []
- name: work_serializer_clears_time_cache - name: work_serializer_clears_time_cache
description: description:
Have the work serializer clear the time cache when it dispatches work. Have the work serializer clear the time cache when it dispatches work.

@ -106,6 +106,8 @@
default: debug default: debug
- name: work_serializer_clears_time_cache - name: work_serializer_clears_time_cache
default: true default: true
- name: settings_timeout
default: true
- name: chttp2_batch_requests - name: chttp2_batch_requests
default: true default: true
- name: chttp2_offload_on_rst_stream - name: chttp2_offload_on_rst_stream

Loading…
Cancel
Save