From ddc33d7863e0e14c9e21aadc98c5fcd82291a898 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 13 Feb 2025 14:19:41 -0800 Subject: [PATCH] [chaotic-good] Fix fuzzer found bugs (#38712) Fix several bugs that all contributed to this test failing: 1. a set of overflow/underflow conditions in fuzzing event engine if no timers were set 2. gigabytes of payload being presented to a fuzzer causing it to time out 3. the server fuzzer not cancelling calls, leading to some calls being stuck during an invalid shutdown sequence 4. a bug in chaotic good whereby a call could get stuck writing a payload forever Additionally, I took the time to improve the debug-ability of stuck reads in fuzzing event engine somewhat - since that was a frustrating experience in looking at this. Closes #38712 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38712 from ctiller:f1 c818031d1ce13788f29841d4e3f6304399cdc1b0 PiperOrigin-RevId: 726634838 --- .../chaotic_good/server_transport.cc | 5 +- test/core/end2end/fuzzers/fuzzing_common.cc | 1 + test/core/end2end/fuzzers/network_input.cc | 3 +- test/core/end2end/fuzzers/server_fuzzer.cc | 68 +++++++++++++++++++ .../fuzzing_event_engine.cc | 34 +++++++--- .../fuzzing_event_engine.h | 46 ++++++++----- 6 files changed, 127 insertions(+), 30 deletions(-) diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc index 039d531093b..5c81fed260d 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.cc +++ b/src/core/ext/transport/chaotic_good/server_transport.cc @@ -102,7 +102,8 @@ auto ChaoticGoodServerTransport::DispatchFrame( return stream->call.SpawnWaitable( "push-frame", [this, stream, frame = std::move(frame), transport = std::move(transport)]() mutable { - return TrySeq( + auto& call = stream->call; + return call.UntilCallCompletes(TrySeq( frame.Payload(), [transport = std::move(transport), header = frame.header()](SliceBuffer payload) { @@ -114,7 +115,7 @@ auto ChaoticGoodServerTransport::DispatchFrame( return Map(call.CancelIfFails(PushFrameIntoCall( std::move(stream), std::move(frame))), [](auto) { return absl::OkStatus(); }); - }); + })); }); }, []() { return absl::OkStatus(); }); diff --git a/test/core/end2end/fuzzers/fuzzing_common.cc b/test/core/end2end/fuzzers/fuzzing_common.cc index b14478f3763..93335a5c96c 100644 --- a/test/core/end2end/fuzzers/fuzzing_common.cc +++ b/test/core/end2end/fuzzers/fuzzing_common.cc @@ -722,6 +722,7 @@ void BasicFuzzer::TryShutdown() { if (server() != nullptr) { if (!server_shutdown_called()) { ShutdownServer(); + CancelAllCallsIfShutdown(); } if (server_finished_shutting_down()) { DestroyServer(); diff --git a/test/core/end2end/fuzzers/network_input.cc b/test/core/end2end/fuzzers/network_input.cc index 369efe10199..62ba8a97fe4 100644 --- a/test/core/end2end/fuzzers/network_input.cc +++ b/test/core/end2end/fuzzers/network_input.cc @@ -259,7 +259,8 @@ SliceBuffer ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame& frame) { case fuzzer_input::ChaoticGoodFrame::kPayloadOtherConnectionId: h.payload_connection_id = frame.payload_other_connection_id().connection_id(); - h.payload_length = frame.payload_other_connection_id().length(); + h.payload_length = std::min( + 32 * 1024 * 1024, frame.payload_other_connection_id().length()); break; case fuzzer_input::ChaoticGoodFrame::kSettings: proto_payload(frame.settings()); diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index 1a0ec8aceb7..124158babac 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -237,5 +237,73 @@ TEST(ServerFuzzers, Chttp2Regression1) { )pb")); } +TEST(ServerFuzzers, ChaoticGoodRegression2) { + ChaoticGood(ParseTestProto( + R"pb(network_input { + connect_timeout_ms: -1 + endpoint_config { args {} } + } + network_input { + input_segments { + segments { + chaotic_good { + known_type: SETTINGS + server_metadata { + status: 4294967295 + message: "" + unknown_metadata { key: "\363\267\223\200" value: "q" } + unknown_metadata {} + } + } + } + segments { + delay_ms: 2147483647 + chaotic_good { + stream_id: 4294967295 + known_type: CLIENT_INITIAL_METADATA + client_metadata { + path: "\364\217\277\277" + authority: "" + unknown_metadata {} + } + } + } + segments { + chaotic_good { + stream_id: 4294967295 + payload_other_connection_id { + connection_id: 1 + length: 2147483647 + } + } + } + segments { + settings { + ack: true + settings { value: 1 } + } + } + } + } + network_input { + single_read_bytes: "" + connect_delay_ms: -20457793 + connect_timeout_ms: -1 + endpoint_config { + args { + key: "\356\200\200" + resource_quota {} + } + } + } + channel_args { + args { + key: "\001" + resource_quota {} + } + } + )pb")); +} + } // namespace testing } // namespace grpc_core diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc index b18a1df407e..4dbb2f7513d 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc @@ -140,7 +140,23 @@ void FuzzingEventEngine::Tick(Duration max_time) { if (!tasks_by_time_.empty()) { incr = std::min(incr, tasks_by_time_.begin()->first - now_); } - now_ += std::max(Duration::zero(), incr); + const auto max_incr = + std::numeric_limits< + decltype(now_.time_since_epoch().count())>::max() - + now_.time_since_epoch().count(); + CHECK_GE(max_incr, 0u); + incr = std::max(Duration::zero(), incr); + incr = std::min(incr, Duration(max_incr)); + GRPC_TRACE_LOG(fuzzing_ee_timers, INFO) + << "Tick " + << GRPC_DUMP_ARGS(now_.time_since_epoch().count(), incr.count(), + max_incr); + if (!tasks_by_time_.empty()) { + GRPC_TRACE_LOG(fuzzing_ee_timers, INFO) + << "first time: " + << tasks_by_time_.begin()->first.time_since_epoch().count(); + } + now_ += incr; CHECK_GE(now_.time_since_epoch().count(), 0); // Find newly expired timers. while (!tasks_by_time_.empty() && tasks_by_time_.begin()->first <= now_) { @@ -386,12 +402,10 @@ bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) { bool FuzzingEventEngine::FuzzingEndpoint::Write( absl::AnyInvocable on_writable, SliceBuffer* data, const WriteArgs*) { - GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) - << "START_WRITE[" << middle_.get() << ":" << my_index() - << "]: " << data->Length() << " bytes"; - IoToken write_token(&g_fuzzing_event_engine->outstanding_writes_); grpc_core::global_stats().IncrementSyscallWrite(); grpc_core::MutexLock lock(&*mu_); + IoToken write_token({"WRITE", middle_.get(), my_index(), + &g_fuzzing_event_engine->outstanding_writes_}); CHECK(!middle_->closed[my_index()]); CHECK(!middle_->writing[my_index()]); // If the write succeeds immediately, then we return true. @@ -477,18 +491,17 @@ FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() { bool FuzzingEventEngine::FuzzingEndpoint::Read( absl::AnyInvocable on_read, SliceBuffer* buffer, const ReadArgs*) { - GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) - << "START_READ[" << middle_.get() << ":" << my_index() << "]"; buffer->Clear(); - IoToken read_token(&g_fuzzing_event_engine->outstanding_reads_); grpc_core::MutexLock lock(&*mu_); + IoToken read_token({"READ", middle_.get(), my_index(), + &g_fuzzing_event_engine->outstanding_reads_}); CHECK(!middle_->closed[my_index()]); if (middle_->pending[peer_index()].empty()) { // If the endpoint is closed, fail asynchronously. if (middle_->closed[peer_index()]) { g_fuzzing_event_engine->RunLocked( - RunType::kRunAfter, - [read_token, on_read = std::move(on_read)]() mutable { + RunType::kRunAfter, [read_token = std::move(read_token), + on_read = std::move(on_read)]() mutable { on_read(absl::InternalError("Endpoint closed")); }); return false; @@ -628,6 +641,7 @@ EventEngine::TaskHandle FuzzingEventEngine::RunAfterLocked( const intptr_t id = next_task_id_; ++next_task_id_; Duration delay_taken = Duration::zero(); + when = std::max(when, Duration::zero()); if (run_type != RunType::kExact) { if (!task_delays_.empty()) { delay_taken = grpc_core::Clamp(task_delays_.front(), Duration::zero(), diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h index fe7fd6b4ff3..60001c7b527 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h @@ -39,6 +39,7 @@ #include "absl/log/log.h" #include "absl/status/status.h" #include "absl/status/statusor.h" +#include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/time_util.h" #include "src/core/lib/experiments/experiments.h" #include "src/core/util/no_destruct.h" @@ -128,32 +129,43 @@ class FuzzingEventEngine : public EventEngine { private: class IoToken { public: - IoToken() : refs_(nullptr) {} - explicit IoToken(std::atomic* refs) : refs_(refs) { - refs_->fetch_add(1, std::memory_order_relaxed); + struct Manifest { + absl::string_view operation = "NOTHING"; + void* whom = nullptr; + int part = 0; + std::atomic* refs = nullptr; + }; + + IoToken() : manifest_{} {} + explicit IoToken(Manifest manifest) : manifest_(manifest) { + manifest_.refs->fetch_add(1, std::memory_order_relaxed); + GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) + << "START_" << manifest_.operation << " " << manifest_.whom << ":" + << manifest_.part; } ~IoToken() { - if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed); - } - IoToken(const IoToken& other) : refs_(other.refs_) { - if (refs_ != nullptr) refs_->fetch_add(1, std::memory_order_relaxed); - } - IoToken& operator=(const IoToken& other) { - IoToken copy(other); - Swap(copy); - return *this; + if (manifest_.refs != nullptr) { + GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) + << "STOP_" << manifest_.operation << " " << manifest_.whom << ":" + << manifest_.part; + manifest_.refs->fetch_sub(1, std::memory_order_relaxed); + } } + IoToken(const IoToken&) = delete; + IoToken& operator=(const IoToken&) = delete; IoToken(IoToken&& other) noexcept - : refs_(std::exchange(other.refs_, nullptr)) {} + : manifest_(std::exchange(other.manifest_, Manifest{})) {} IoToken& operator=(IoToken&& other) noexcept { - if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed); - refs_ = std::exchange(other.refs_, nullptr); + if (manifest_.refs != nullptr) { + manifest_.refs->fetch_sub(1, std::memory_order_relaxed); + } + manifest_ = std::exchange(other.manifest_, Manifest{}); return *this; } - void Swap(IoToken& other) { std::swap(refs_, other.refs_); } + void Swap(IoToken& other) { std::swap(manifest_, other.manifest_); } private: - std::atomic* refs_; + Manifest manifest_; }; enum class RunType {