[chaotic-good] Fix fuzzer found bugs (#38712)

Fix several bugs that all contributed to this test failing:
1. a set of overflow/underflow conditions in fuzzing event engine if no timers were set
2. gigabytes of payload being presented to a fuzzer causing it to time out
3. the server fuzzer not cancelling calls, leading to some calls being stuck during an invalid shutdown sequence
4. a bug in chaotic good whereby a call could get stuck writing a payload forever

Additionally, I took the time to improve the debug-ability of stuck reads in fuzzing event engine somewhat - since that was a frustrating experience in looking at this.

Closes #38712

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38712 from ctiller:f1 c818031d1c
PiperOrigin-RevId: 726634838
pull/38738/head
Craig Tiller 1 week ago committed by Copybara-Service
parent 1bc1cffda8
commit ddc33d7863
  1. 5
      src/core/ext/transport/chaotic_good/server_transport.cc
  2. 1
      test/core/end2end/fuzzers/fuzzing_common.cc
  3. 3
      test/core/end2end/fuzzers/network_input.cc
  4. 68
      test/core/end2end/fuzzers/server_fuzzer.cc
  5. 34
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  6. 46
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h

@ -102,7 +102,8 @@ auto ChaoticGoodServerTransport::DispatchFrame(
return stream->call.SpawnWaitable(
"push-frame", [this, stream, frame = std::move(frame),
transport = std::move(transport)]() mutable {
return TrySeq(
auto& call = stream->call;
return call.UntilCallCompletes(TrySeq(
frame.Payload(),
[transport = std::move(transport),
header = frame.header()](SliceBuffer payload) {
@ -114,7 +115,7 @@ auto ChaoticGoodServerTransport::DispatchFrame(
return Map(call.CancelIfFails(PushFrameIntoCall(
std::move(stream), std::move(frame))),
[](auto) { return absl::OkStatus(); });
});
}));
});
},
[]() { return absl::OkStatus(); });

@ -722,6 +722,7 @@ void BasicFuzzer::TryShutdown() {
if (server() != nullptr) {
if (!server_shutdown_called()) {
ShutdownServer();
CancelAllCallsIfShutdown();
}
if (server_finished_shutting_down()) {
DestroyServer();

@ -259,7 +259,8 @@ SliceBuffer ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame& frame) {
case fuzzer_input::ChaoticGoodFrame::kPayloadOtherConnectionId:
h.payload_connection_id =
frame.payload_other_connection_id().connection_id();
h.payload_length = frame.payload_other_connection_id().length();
h.payload_length = std::min<uint32_t>(
32 * 1024 * 1024, frame.payload_other_connection_id().length());
break;
case fuzzer_input::ChaoticGoodFrame::kSettings:
proto_payload(frame.settings());

@ -237,5 +237,73 @@ TEST(ServerFuzzers, Chttp2Regression1) {
)pb"));
}
TEST(ServerFuzzers, ChaoticGoodRegression2) {
ChaoticGood(ParseTestProto(
R"pb(network_input {
connect_timeout_ms: -1
endpoint_config { args {} }
}
network_input {
input_segments {
segments {
chaotic_good {
known_type: SETTINGS
server_metadata {
status: 4294967295
message: ""
unknown_metadata { key: "\363\267\223\200" value: "q" }
unknown_metadata {}
}
}
}
segments {
delay_ms: 2147483647
chaotic_good {
stream_id: 4294967295
known_type: CLIENT_INITIAL_METADATA
client_metadata {
path: "\364\217\277\277"
authority: ""
unknown_metadata {}
}
}
}
segments {
chaotic_good {
stream_id: 4294967295
payload_other_connection_id {
connection_id: 1
length: 2147483647
}
}
}
segments {
settings {
ack: true
settings { value: 1 }
}
}
}
}
network_input {
single_read_bytes: ""
connect_delay_ms: -20457793
connect_timeout_ms: -1
endpoint_config {
args {
key: "\356\200\200"
resource_quota {}
}
}
}
channel_args {
args {
key: "\001"
resource_quota {}
}
}
)pb"));
}
} // namespace testing
} // namespace grpc_core

@ -140,7 +140,23 @@ void FuzzingEventEngine::Tick(Duration max_time) {
if (!tasks_by_time_.empty()) {
incr = std::min(incr, tasks_by_time_.begin()->first - now_);
}
now_ += std::max(Duration::zero(), incr);
const auto max_incr =
std::numeric_limits<
decltype(now_.time_since_epoch().count())>::max() -
now_.time_since_epoch().count();
CHECK_GE(max_incr, 0u);
incr = std::max(Duration::zero(), incr);
incr = std::min(incr, Duration(max_incr));
GRPC_TRACE_LOG(fuzzing_ee_timers, INFO)
<< "Tick "
<< GRPC_DUMP_ARGS(now_.time_since_epoch().count(), incr.count(),
max_incr);
if (!tasks_by_time_.empty()) {
GRPC_TRACE_LOG(fuzzing_ee_timers, INFO)
<< "first time: "
<< tasks_by_time_.begin()->first.time_since_epoch().count();
}
now_ += incr;
CHECK_GE(now_.time_since_epoch().count(), 0);
// Find newly expired timers.
while (!tasks_by_time_.empty() && tasks_by_time_.begin()->first <= now_) {
@ -386,12 +402,10 @@ bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) {
bool FuzzingEventEngine::FuzzingEndpoint::Write(
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
const WriteArgs*) {
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "START_WRITE[" << middle_.get() << ":" << my_index()
<< "]: " << data->Length() << " bytes";
IoToken write_token(&g_fuzzing_event_engine->outstanding_writes_);
grpc_core::global_stats().IncrementSyscallWrite();
grpc_core::MutexLock lock(&*mu_);
IoToken write_token({"WRITE", middle_.get(), my_index(),
&g_fuzzing_event_engine->outstanding_writes_});
CHECK(!middle_->closed[my_index()]);
CHECK(!middle_->writing[my_index()]);
// If the write succeeds immediately, then we return true.
@ -477,18 +491,17 @@ FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() {
bool FuzzingEventEngine::FuzzingEndpoint::Read(
absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs*) {
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "START_READ[" << middle_.get() << ":" << my_index() << "]";
buffer->Clear();
IoToken read_token(&g_fuzzing_event_engine->outstanding_reads_);
grpc_core::MutexLock lock(&*mu_);
IoToken read_token({"READ", middle_.get(), my_index(),
&g_fuzzing_event_engine->outstanding_reads_});
CHECK(!middle_->closed[my_index()]);
if (middle_->pending[peer_index()].empty()) {
// If the endpoint is closed, fail asynchronously.
if (middle_->closed[peer_index()]) {
g_fuzzing_event_engine->RunLocked(
RunType::kRunAfter,
[read_token, on_read = std::move(on_read)]() mutable {
RunType::kRunAfter, [read_token = std::move(read_token),
on_read = std::move(on_read)]() mutable {
on_read(absl::InternalError("Endpoint closed"));
});
return false;
@ -628,6 +641,7 @@ EventEngine::TaskHandle FuzzingEventEngine::RunAfterLocked(
const intptr_t id = next_task_id_;
++next_task_id_;
Duration delay_taken = Duration::zero();
when = std::max(when, Duration::zero());
if (run_type != RunType::kExact) {
if (!task_delays_.empty()) {
delay_taken = grpc_core::Clamp(task_delays_.front(), Duration::zero(),

@ -39,6 +39,7 @@
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/time_util.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/util/no_destruct.h"
@ -128,32 +129,43 @@ class FuzzingEventEngine : public EventEngine {
private:
class IoToken {
public:
IoToken() : refs_(nullptr) {}
explicit IoToken(std::atomic<size_t>* refs) : refs_(refs) {
refs_->fetch_add(1, std::memory_order_relaxed);
struct Manifest {
absl::string_view operation = "NOTHING";
void* whom = nullptr;
int part = 0;
std::atomic<size_t>* refs = nullptr;
};
IoToken() : manifest_{} {}
explicit IoToken(Manifest manifest) : manifest_(manifest) {
manifest_.refs->fetch_add(1, std::memory_order_relaxed);
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "START_" << manifest_.operation << " " << manifest_.whom << ":"
<< manifest_.part;
}
~IoToken() {
if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed);
}
IoToken(const IoToken& other) : refs_(other.refs_) {
if (refs_ != nullptr) refs_->fetch_add(1, std::memory_order_relaxed);
}
IoToken& operator=(const IoToken& other) {
IoToken copy(other);
Swap(copy);
return *this;
if (manifest_.refs != nullptr) {
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "STOP_" << manifest_.operation << " " << manifest_.whom << ":"
<< manifest_.part;
manifest_.refs->fetch_sub(1, std::memory_order_relaxed);
}
}
IoToken(const IoToken&) = delete;
IoToken& operator=(const IoToken&) = delete;
IoToken(IoToken&& other) noexcept
: refs_(std::exchange(other.refs_, nullptr)) {}
: manifest_(std::exchange(other.manifest_, Manifest{})) {}
IoToken& operator=(IoToken&& other) noexcept {
if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed);
refs_ = std::exchange(other.refs_, nullptr);
if (manifest_.refs != nullptr) {
manifest_.refs->fetch_sub(1, std::memory_order_relaxed);
}
manifest_ = std::exchange(other.manifest_, Manifest{});
return *this;
}
void Swap(IoToken& other) { std::swap(refs_, other.refs_); }
void Swap(IoToken& other) { std::swap(manifest_, other.manifest_); }
private:
std::atomic<size_t>* refs_;
Manifest manifest_;
};
enum class RunType {

Loading…
Cancel
Save