From fe5aace3116849f473ee295f68f696d54878bfb8 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 16 Nov 2022 23:26:48 -0800 Subject: [PATCH] [flow_control] Fixes for the flow_control_fixes experiment found via fuzzing (#31676) * [flow_control] Enable experiment in debug builds, fix bugs found by fuzzer * fix * disable experiment * flowctl * flowctl * Automated change: Fix sanity tests * fix * fix Co-authored-by: ctiller --- .../chttp2/transport/flow_control.cc | 26 ++++- .../transport/chttp2/transport/flow_control.h | 4 +- .../ext/transport/chttp2/transport/parsing.cc | 8 +- .../transport/chttp2/flow_control_fuzzer.cc | 20 +++- ...h-31a50b8ef9ccb9a81da0f55f5a77683967102625 | 108 ++++++++++++++++++ ...h-66533e16501fe7e153ecc9c74d296bd4984979e2 | 103 +++++++++++++++++ ...h-7c7c95a473a7ff617a23f87eea1537e0ee40eff6 | 100 ++++++++++++++++ ...h-dba053769d617a54fa52edfbd30e70441a4a6a49 | 100 ++++++++++++++++ .../transport/chttp2/flow_control_test.cc | 12 +- 9 files changed, 466 insertions(+), 15 deletions(-) create mode 100644 test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-31a50b8ef9ccb9a81da0f55f5a77683967102625 create mode 100644 test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-66533e16501fe7e153ecc9c74d296bd4984979e2 create mode 100644 test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-7c7c95a473a7ff617a23f87eea1537e0ee40eff6 create mode 100644 test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-dba053769d617a54fa52edfbd30e70441a4a6a49 diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index 78e34fefa73..48eae6e67b6 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -278,16 +278,21 @@ void TransportFlowControl::UpdateSetting( Clamp(new_desired_value, grpc_chttp2_settings_parameters[id].min_value, grpc_chttp2_settings_parameters[id].max_value); if (new_desired_value != *desired_value) { - *desired_value = new_desired_value; + if (grpc_flowctl_trace.enabled()) { + gpr_log(GPR_INFO, "[flowctl] UPDATE SETTING %s from %" PRId64 " to %d", + grpc_chttp2_settings_parameters[id].name, *desired_value, + new_desired_value); + } // Reaching zero can only happen for initial window size, and if it occurs // we really want to wake up writes and ensure all the queued stream // window updates are flushed, since stream flow control operates // differently at zero window size. FlowControlAction::Urgency urgency = FlowControlAction::Urgency::QUEUE_UPDATE; - if (new_desired_value == 0) { + if (*desired_value == 0 || new_desired_value == 0) { urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY; } + *desired_value = new_desired_value; (action->*set)(urgency, *desired_value); } } else { @@ -302,6 +307,21 @@ void TransportFlowControl::UpdateSetting( } } +FlowControlAction TransportFlowControl::SetAckedInitialWindow(uint32_t value) { + acked_init_window_ = value; + FlowControlAction action; + if (IsFlowControlFixesEnabled() && + acked_init_window_ != target_initial_window_size_) { + FlowControlAction::Urgency urgency = + FlowControlAction::Urgency::QUEUE_UPDATE; + if (acked_init_window_ == 0 || target_initial_window_size_ == 0) { + urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY; + } + action.set_send_initial_window_update(urgency, target_initial_window_size_); + } + return action; +} + FlowControlAction TransportFlowControl::PeriodicUpdate() { FlowControlAction action; if (enable_bdp_probe_) { @@ -393,7 +413,7 @@ uint32_t StreamFlowControl::MaybeSendUpdate() { pending_size_ = absl::nullopt; tfc_upd.UpdateAnnouncedWindowDelta(&announced_window_delta_, announce); GPR_ASSERT(DesiredAnnounceSize() == 0); - tfc_upd.MakeAction(); + std::ignore = tfc_upd.MakeAction(); return static_cast(announce); } diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h index 48c531eb371..e4d71d2e133 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.h +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -74,7 +74,7 @@ enum class StallEdge { kNoChange, kStalled, kUnstalled }; // Encapsulates a collections of actions the transport needs to take with // regard to flow control. Each action comes with urgencies that tell the // transport how quickly the action must take place. -class FlowControlAction { +class GRPC_MUST_USE_RESULT FlowControlAction { public: enum class Urgency : uint8_t { // Nothing to be done. @@ -261,7 +261,7 @@ class TransportFlowControl final { uint32_t acked_init_window() const { return acked_init_window_; } uint32_t sent_init_window() const { return target_initial_window_size_; } - void SetAckedInitialWindow(uint32_t value) { acked_init_window_ = value; } + FlowControlAction SetAckedInitialWindow(uint32_t value); // Getters int64_t remote_window() const { return remote_window_; } diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 4488e6dc6b7..95ebb29e776 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -697,9 +697,11 @@ static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t) { t->hpack_parser.hpack_table()->SetMaxBytes( t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); - t->flow_control.SetAckedInitialWindow( - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + grpc_chttp2_act_on_flowctl_action( + t->flow_control.SetAckedInitialWindow( + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), + t, nullptr); t->sent_local_settings = false; } t->parser = grpc_chttp2_settings_parser_parse; diff --git a/test/core/transport/chttp2/flow_control_fuzzer.cc b/test/core/transport/chttp2/flow_control_fuzzer.cc index f9a9c50e179..ab3ad3f9a3d 100644 --- a/test/core/transport/chttp2/flow_control_fuzzer.cc +++ b/test/core/transport/chttp2/flow_control_fuzzer.cc @@ -29,6 +29,7 @@ #include "absl/base/attributes.h" #include "absl/status/status.h" +#include "absl/strings/str_join.h" #include "absl/types/optional.h" #include @@ -220,7 +221,9 @@ void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) { fprintf(stderr, "Received ACK for initial window size %d\n", *sent_from_remote.ack_initial_window_size); } - tfc_->SetAckedInitialWindow(*sent_from_remote.ack_initial_window_size); + PerformAction(tfc_->SetAckedInitialWindow( + *sent_from_remote.ack_initial_window_size), + nullptr); sending_initial_window_size_ = false; } if (sent_from_remote.bdp_pong) { @@ -356,6 +359,7 @@ void FlowControlFuzzer::AssertNoneStuck() const { std::map reconciled_stream_deltas; int64_t reconciled_transport_window = remote_transport_window_size_; int64_t reconciled_initial_window = remote_initial_window_size_; + std::vector inflight_send_initial_windows; for (const auto& id_stream : streams_) { reconciled_stream_deltas[id_stream.first] = id_stream.second.window_delta; } @@ -365,6 +369,8 @@ void FlowControlFuzzer::AssertNoneStuck() const { for (const auto& send_to_remote : send_to_remote_) { if (send_to_remote.initial_window_size.has_value()) { reconciled_initial_window = *send_to_remote.initial_window_size; + inflight_send_initial_windows.push_back( + *send_to_remote.initial_window_size); } reconciled_transport_window += send_to_remote.transport_window_update; for (const auto& stream_update : send_to_remote.stream_window_updates) { @@ -381,6 +387,14 @@ void FlowControlFuzzer::AssertNoneStuck() const { } } + // If we're sending an initial window size we get to consider a queued initial + // window size too: it'll be sent as soon as the remote acks the settings + // change, which it must. + if (sending_initial_window_size_ && queued_initial_window_size_.has_value()) { + reconciled_initial_window = *queued_initial_window_size_; + inflight_send_initial_windows.push_back(*queued_initial_window_size_); + } + // Finally, if a stream has indicated it's willing to read, the reconciled // remote *MUST* be in a state where it could send at least one byte. for (const auto& id_stream : streams_) { @@ -396,6 +410,10 @@ void FlowControlFuzzer::AssertNoneStuck() const { reconciled_stream_deltas[id_stream.first], reconciled_initial_window, (id_stream.second.fc.min_progress_size())); + fprintf(stderr, + "initial_window breakdown: remote=%" PRId32 ", in-flight={%s}\n", + remote_initial_window_size_, + absl::StrJoin(inflight_send_initial_windows, ",").c_str()); abort(); } } diff --git a/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-31a50b8ef9ccb9a81da0f55f5a77683967102625 b/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-31a50b8ef9ccb9a81da0f55f5a77683967102625 new file mode 100644 index 00000000000..0aa2e051e30 --- /dev/null +++ b/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-31a50b8ef9ccb9a81da0f55f5a77683967102625 @@ -0,0 +1,108 @@ +enable_bdp: true +actions { + set_min_progress_size { + id: 11264 + size: 2883584 + } +} +actions { + set_pending_size { + id: 1593844738 + size: 11264 + } +} +actions { + set_memory_quota: 0 +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + stream_write { + id: 11264 + size: 2883584 + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + read_send_from_remote { + } +} +actions { + step_time_ms: 474140931328 +} +actions { + read_send_from_remote { + } +} +actions { + read_send_to_remote { + } +} +actions { + read_send_from_remote { + } +} +actions { + read_send_from_remote { + } +} +actions { + set_memory_quota: 474140901376 +} +actions { + read_send_to_remote { + } +} +actions { + allocate_memory: 1572864 +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + set_min_progress_size { + id: 1593844738 + size: 11264 + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + read_send_from_remote { + } +} +actions { + periodic_update { + } +} diff --git a/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-66533e16501fe7e153ecc9c74d296bd4984979e2 b/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-66533e16501fe7e153ecc9c74d296bd4984979e2 new file mode 100644 index 00000000000..b9a7a503383 --- /dev/null +++ b/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-66533e16501fe7e153ecc9c74d296bd4984979e2 @@ -0,0 +1,103 @@ +enable_bdp: true +actions { + perform_send_from_remote { + } +} +actions { + periodic_update { + } +} +actions { + set_memory_quota: 0 +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + stream_write { + id: 2883584 + size: 2883584 + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + read_send_from_remote { + } +} +actions { + read_send_to_remote { + } +} +actions { + read_send_from_remote { + } +} +actions { + read_send_from_remote { + } +} +actions { + set_memory_quota: 474140901376 +} +actions { + read_send_to_remote { + } +} +actions { + allocate_memory: 1572864 +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + set_min_progress_size { + id: 1593844738 + size: 11264 + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} diff --git a/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-7c7c95a473a7ff617a23f87eea1537e0ee40eff6 b/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-7c7c95a473a7ff617a23f87eea1537e0ee40eff6 new file mode 100644 index 00000000000..ee8f0adce47 --- /dev/null +++ b/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-7c7c95a473a7ff617a23f87eea1537e0ee40eff6 @@ -0,0 +1,100 @@ +enable_bdp: true +actions { + perform_send_to_remote { + } +} +actions { + stream_write { + size: 6356992 + } +} +actions { + set_memory_quota: 0 +} +actions { + step_time_ms: 474140901376 +} +actions { + set_min_progress_size { + size: 285147136 + } +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 0 +} +actions { + periodic_update { + } +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 474140901376 +} +actions { +} +actions { + periodic_update { + } +} +actions { + read_send_from_remote { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + set_memory_quota: 474140901376 +} +actions { + read_send_to_remote { + } +} +actions { + allocate_memory: 1572864 +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 28261 +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 18014398509481984 +} +actions { + periodic_update { + } +} +actions { + set_min_progress_size { + id: 9 + size: 11264 + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} diff --git a/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-dba053769d617a54fa52edfbd30e70441a4a6a49 b/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-dba053769d617a54fa52edfbd30e70441a4a6a49 new file mode 100644 index 00000000000..0fe37d23685 --- /dev/null +++ b/test/core/transport/chttp2/flow_control_fuzzer_corpus/crash-dba053769d617a54fa52edfbd30e70441a4a6a49 @@ -0,0 +1,100 @@ +enable_bdp: true +actions { + set_min_progress_size { + size: 33554432 + } +} +actions { + stream_write { + size: 1633746944 + } +} +actions { + set_memory_quota: 0 +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + stream_write { + id: 2883584 + size: 2883584 + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + periodic_update { + } +} +actions { + read_send_from_remote { + } +} +actions { +} +actions { + periodic_update { + } +} +actions { + read_send_from_remote { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + set_memory_quota: 474140901376 +} +actions { + periodic_update { + } +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + set_min_progress_size { + id: 1593844738 + size: 11264 + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} +actions { + step_time_ms: 474140901376 +} +actions { + periodic_update { + } +} diff --git a/test/core/transport/chttp2/flow_control_test.cc b/test/core/transport/chttp2/flow_control_test.cc index 99ea0d923e0..985ee002482 100644 --- a/test/core/transport/chttp2/flow_control_test.cc +++ b/test/core/transport/chttp2/flow_control_test.cc @@ -169,7 +169,7 @@ TEST(FlowControl, RecvData) { int64_t prev_preferred_rx_frame_size = tfc.target_preferred_rx_crypto_frame_size(); EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(1024)); - sfc_upd.MakeAction(); + std::ignore = sfc_upd.MakeAction(); EXPECT_EQ(tfc.announced_window(), 65535 - 1024); EXPECT_EQ(sfc.announced_window_delta(), -1024); EXPECT_EQ(tfc.target_preferred_rx_crypto_frame_size(), @@ -183,31 +183,31 @@ TEST(FlowControl, TrackMinProgressSize) { { StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); sfc_upd.SetMinProgressSize(5); - sfc_upd.MakeAction(); + std::ignore = sfc_upd.MakeAction(); } EXPECT_EQ(sfc.min_progress_size(), 5); { StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); sfc_upd.SetMinProgressSize(10); - sfc_upd.MakeAction(); + std::ignore = sfc_upd.MakeAction(); } EXPECT_EQ(sfc.min_progress_size(), 10); { StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5)); - sfc_upd.MakeAction(); + std::ignore = sfc_upd.MakeAction(); } EXPECT_EQ(sfc.min_progress_size(), 5); { StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5)); - sfc_upd.MakeAction(); + std::ignore = sfc_upd.MakeAction(); } EXPECT_EQ(sfc.min_progress_size(), 0); { StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5)); - sfc_upd.MakeAction(); + std::ignore = sfc_upd.MakeAction(); } EXPECT_EQ(sfc.min_progress_size(), 0); }