[fuzzing] Fix bug on endpoint shutdown whereby we leave read requests dangling (#33406)

Fix fuzzer found bug b/286716972

Follows up on https://github.com/grpc/grpc/pull/33266 but gets the edge
case right of when there's a read queued before the peer closes - in
that case we weren't waking up the read.
pull/33273/head
Craig Tiller 2 years ago committed by GitHub
parent 969c228934
commit 2055cce132
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 12
      test/core/end2end/end2end_test_corpus/bad_ping/clusterfuzz-testcase-minimized-bad_ping_fuzzer-4826792586182656.test
  3. 22
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  4. 2
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h

@ -1648,6 +1648,8 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
}
static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_INFO, "%p CANCEL PINGS: %s", t, error.ToString().c_str()));
// callback remaining pings: they're not allowed to call into the transport,
// and maybe they hold resources that need to be freed
grpc_chttp2_ping_queue* pq = &t->ping_queue;
@ -2976,8 +2978,9 @@ static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state,
const absl::Status& status,
const char* reason) {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_INFO, "transport %p set connectivity_state=%d; status=%s; reason=%s",
t, state, status.ToString().c_str(), reason));
t->state_tracker.SetState(state, status, reason);
}

@ -0,0 +1,12 @@
test_id: 3211264
event_engine_actions {
connections {
write_size: 3211264
write_size: 3211264
write_size: 3211264
write_size: 3211264
write_size: 128
write_size: 4194363
write_size: 0
}
}

@ -307,16 +307,11 @@ bool FuzzingEventEngine::FuzzingEndpoint::Write(
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
const WriteArgs*) {
grpc_core::MutexLock lock(&*mu_);
// If the endpoint is closed, then we fail the write.
if (middle_->closed[my_index()]) {
g_fuzzing_event_engine->RunLocked(
[on_writable = std::move(on_writable)]() mutable {
on_writable(absl::InternalError("Endpoint closed"));
});
return false;
}
GPR_ASSERT(!middle_->closed[my_index()]);
GPR_ASSERT(!middle_->writing[my_index()]);
// If the write succeeds immediately, then we return true.
if (middle_->Write(data, my_index())) return true;
middle_->writing[my_index()] = true;
ScheduleDelayedWrite(middle_, my_index(), std::move(on_writable), data);
return false;
}
@ -328,6 +323,7 @@ void FuzzingEventEngine::FuzzingEndpoint::ScheduleDelayedWrite(
[middle = std::move(middle), index, data,
on_writable = std::move(on_writable)]() mutable {
grpc_core::ReleasableMutexLock lock(&*mu_);
GPR_ASSERT(middle->writing[index]);
if (middle->closed[index]) {
g_fuzzing_event_engine->RunLocked(
[on_writable = std::move(on_writable)]() mutable {
@ -336,6 +332,7 @@ void FuzzingEventEngine::FuzzingEndpoint::ScheduleDelayedWrite(
return;
}
if (middle->Write(data, index)) {
middle->writing[index] = false;
lock.Release();
on_writable(absl::OkStatus());
return;
@ -355,6 +352,15 @@ FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() {
});
middle_->pending_read[my_index()].reset();
}
if (!middle_->writing[peer_index()] &&
middle_->pending_read[peer_index()].has_value()) {
g_fuzzing_event_engine->RunLocked(
[cb = std::move(
middle_->pending_read[peer_index()]->on_read)]() mutable {
cb(absl::InternalError("Endpoint closed"));
});
middle_->pending_read[peer_index()].reset();
}
}
bool FuzzingEventEngine::FuzzingEndpoint::Read(

@ -173,6 +173,8 @@ class FuzzingEventEngine : public EventEngine {
const ResolvedAddress addrs[2];
// Is the endpoint closed?
bool closed[2] ABSL_GUARDED_BY(mu_) = {false, false};
// Is the endpoint writing?
bool writing[2] ABSL_GUARDED_BY(mu_) = {false, false};
// Bytes written into each endpoint and awaiting a read.
std::vector<uint8_t> pending[2] ABSL_GUARDED_BY(mu_);
// The sizes of each accepted write, as determined by the fuzzer actions.

Loading…
Cancel
Save