[fuzzer] Add a fuzzer for SubChannelConnector instances (#37397)

Includes a few changes to pollset stuff to make it easier to not use pollsets (which I think is going to be generally helpful in the coming months)

Closes #37397

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37397 from ctiller:client-chicken 1099cd500a
PiperOrigin-RevId: 660014128
pull/37358/head
Craig Tiller 4 months ago committed by Copybara-Service
parent b95e8dd3b9
commit 3de09c544d
  1. 12
      src/core/lib/iomgr/polling_entity.cc
  2. 57
      test/core/end2end/fuzzers/BUILD
  3. 189
      test/core/end2end/fuzzers/connector_fuzzer.cc
  4. 34
      test/core/end2end/fuzzers/connector_fuzzer.h
  5. 30
      test/core/end2end/fuzzers/connector_fuzzer_chttp2.cc
  6. 1
      test/core/end2end/fuzzers/connector_fuzzer_chttp2_corpus/empty
  7. 36
      test/core/end2end/fuzzers/connector_fuzzer_chttp2_fakesec.cc
  8. 1
      test/core/end2end/fuzzers/connector_fuzzer_chttp2_fakesec_corpus/empty
  9. 23
      test/core/end2end/fuzzers/fuzzer_input.proto
  10. 50
      test/core/end2end/fuzzers/network_input.cc
  11. 6
      test/core/end2end/fuzzers/network_input.h
  12. 75
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  13. 39
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h

@ -30,8 +30,12 @@
grpc_polling_entity grpc_polling_entity_create_from_pollset_set(
grpc_pollset_set* pollset_set) {
grpc_polling_entity pollent;
pollent.pollent.pollset_set = pollset_set;
pollent.tag = GRPC_POLLS_POLLSET_SET;
if (pollset_set == nullptr) {
pollent.tag = GRPC_POLLS_NONE;
} else {
pollent.pollent.pollset_set = pollset_set;
pollent.tag = GRPC_POLLS_POLLSET_SET;
}
return pollent;
}
@ -73,6 +77,8 @@ void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent,
} else if (pollent->tag == GRPC_POLLS_POLLSET_SET) {
CHECK_NE(pollent->pollent.pollset_set, nullptr);
grpc_pollset_set_add_pollset_set(pss_dst, pollent->pollent.pollset_set);
} else if (pollent->tag == GRPC_POLLS_NONE) {
// Do nothing.
} else {
grpc_core::Crash(
absl::StrFormat("Invalid grpc_polling_entity tag '%d'", pollent->tag));
@ -93,6 +99,8 @@ void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity* pollent,
} else if (pollent->tag == GRPC_POLLS_POLLSET_SET) {
CHECK_NE(pollent->pollent.pollset_set, nullptr);
grpc_pollset_set_del_pollset_set(pss_dst, pollent->pollent.pollset_set);
} else if (pollent->tag == GRPC_POLLS_NONE) {
// Do nothing.
} else {
grpc_core::Crash(
absl::StrFormat("Invalid grpc_polling_entity tag '%d'", pollent->tag));

@ -206,3 +206,60 @@ grpc_proto_fuzzer(
"//src/core:chaotic_good_server",
],
)
grpc_cc_library(
name = "connector_fuzzer",
srcs = ["connector_fuzzer.cc"],
hdrs = ["connector_fuzzer.h"],
external_deps = ["absl/log:check"],
deps = [
"fuzzer_input_proto",
"fuzzing_common",
"network_input",
"//:gpr",
"//:grpc",
"//src/core:channel_args",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/test_util:fuzz_config_vars",
"//test/core/test_util:grpc_test_util",
"//test/core/test_util:grpc_test_util_base",
],
)
grpc_proto_fuzzer(
name = "connector_fuzzer_chttp2",
srcs = ["connector_fuzzer_chttp2.cc"],
corpus = "connector_fuzzer_chttp2_corpus",
end2end_fuzzer = True,
language = "C++",
proto = None,
tags = [
"no_mac",
"no_windows",
],
uses_event_engine = False,
uses_polling = False,
deps = [
":connector_fuzzer",
"//:grpc",
],
)
grpc_proto_fuzzer(
name = "connector_fuzzer_chttp2_fakesec",
srcs = ["connector_fuzzer_chttp2_fakesec.cc"],
corpus = "connector_fuzzer_chttp2_fakesec_corpus",
end2end_fuzzer = True,
language = "C++",
proto = None,
tags = [
"no_mac",
"no_windows",
],
uses_event_engine = False,
uses_polling = False,
deps = [
":connector_fuzzer",
"//:grpc",
],
)

@ -0,0 +1,189 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/end2end/fuzzers/connector_fuzzer.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
#include "test/core/end2end/fuzzers/network_input.h"
#include "test/core/test_util/fuzz_config_vars.h"
#include "test/core/test_util/test_config.h"
bool squelch = true;
bool leak_check = true;
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::FuzzingEventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
using ::grpc_event_engine::experimental::MockEndpointController;
using ::grpc_event_engine::experimental::SetEventEngineFactory;
using ::grpc_event_engine::experimental::URIToResolvedAddress;
namespace grpc_core {
namespace {
class ConnectorFuzzer {
public:
ConnectorFuzzer(
const fuzzer_input::Msg& msg,
absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector>()>
make_security_connector,
absl::FunctionRef<OrphanablePtr<SubchannelConnector>()> make_connector)
: make_security_connector_(make_security_connector),
engine_([actions = msg.event_engine_actions()]() {
SetEventEngineFactory([actions]() -> std::unique_ptr<EventEngine> {
return std::make_unique<FuzzingEventEngine>(
FuzzingEventEngine::Options(), actions);
});
return std::dynamic_pointer_cast<FuzzingEventEngine>(
GetDefaultEventEngine());
}()),
mock_endpoint_controller_(MockEndpointController::Create(engine_)),
connector_(make_connector()) {
CHECK(engine_);
for (const auto& input : msg.network_input()) {
network_inputs_.push(input);
}
grpc_timer_manager_set_start_threaded(false);
grpc_init();
ExecCtx exec_ctx;
Executor::SetThreadingAll(false);
listener_ =
engine_
->CreateListener(
[this](std::unique_ptr<EventEngine::Endpoint> endpoint,
MemoryAllocator) {
if (network_inputs_.empty()) return;
ScheduleWrites(network_inputs_.front(), std::move(endpoint),
engine_.get());
network_inputs_.pop();
},
[](absl::Status) {}, ChannelArgsEndpointConfig(ChannelArgs{}),
std::make_unique<MemoryQuota>("foo"))
.value();
if (msg.has_shutdown_connector() &&
msg.shutdown_connector().delay_ms() > 0) {
auto shutdown_connector = msg.shutdown_connector();
const auto delay = Duration::Milliseconds(shutdown_connector.delay_ms());
engine_->RunAfterExactly(delay, [this, shutdown_connector = std::move(
shutdown_connector)]() {
if (connector_ == nullptr) return;
connector_->Shutdown(absl::Status(
static_cast<absl::StatusCode>(shutdown_connector.shutdown_status()),
shutdown_connector.shutdown_message()));
});
}
// Abbreviated runtime for interpreting API actions, since we simply don't
// support many here.
uint64_t when_ms = 0;
for (const auto& action : msg.api_actions()) {
switch (action.type_case()) {
default:
break;
case api_fuzzer::Action::kSleepMs:
when_ms += action.sleep_ms();
break;
case api_fuzzer::Action::kResizeResourceQuota:
engine_->RunAfterExactly(
Duration::Milliseconds(when_ms),
[this, new_size = action.resize_resource_quota()]() {
resource_quota_->memory_quota()->SetSize(new_size);
});
when_ms += 1;
break;
}
}
}
~ConnectorFuzzer() {
listener_.reset();
connector_.reset();
mock_endpoint_controller_.reset();
engine_->TickUntilIdle();
grpc_shutdown_blocking();
engine_->UnsetGlobalHooks();
}
void Run() {
grpc_resolved_address addr;
CHECK(grpc_parse_uri(URI::Parse("ipv4:127.0.0.1:1234").value(), &addr));
CHECK_OK(
listener_->Bind(URIToResolvedAddress("ipv4:127.0.0.1:1234").value()));
CHECK_OK(listener_->Start());
OrphanablePtr<grpc_endpoint> endpoint(
mock_endpoint_controller_->TakeCEndpoint());
SubchannelConnector::Result result;
bool done = false;
auto channel_args = ChannelArgs{}.SetObject<EventEngine>(engine_).SetObject(
resource_quota_);
auto security_connector = make_security_connector_();
if (security_connector != nullptr) {
channel_args = channel_args.SetObject(std::move(security_connector));
}
connector_->Connect(
SubchannelConnector::Args{&addr, nullptr,
Timestamp::Now() + Duration::Seconds(20),
channel_args},
&result, NewClosure([&done, &result](grpc_error_handle status) {
done = true;
if (status.ok()) result.transport->Orphan();
}));
while (!done) {
engine_->Tick();
grpc_timer_manager_tick();
}
}
private:
RefCountedPtr<ResourceQuota> resource_quota_ =
MakeRefCounted<ResourceQuota>("fuzzer");
absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector>()>
make_security_connector_;
std::shared_ptr<FuzzingEventEngine> engine_;
std::queue<fuzzer_input::NetworkInput> network_inputs_;
std::shared_ptr<MockEndpointController> mock_endpoint_controller_;
std::unique_ptr<EventEngine::Listener> listener_;
OrphanablePtr<SubchannelConnector> connector_;
};
} // namespace
void RunConnectorFuzzer(
const fuzzer_input::Msg& msg,
absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector>()>
make_security_connector,
absl::FunctionRef<OrphanablePtr<SubchannelConnector>()> make_connector) {
if (squelch && !GetEnv("GRPC_TRACE_FUZZER").has_value()) {
grpc_disable_all_absl_logs();
}
static const int once = []() {
ForceEnableExperiment("event_engine_client", true);
ForceEnableExperiment("event_engine_listener", true);
return 42;
}();
CHECK_EQ(once, 42); // avoid unused variable warning
ApplyFuzzConfigVars(msg.config_vars());
TestOnlyReloadExperimentsFromConfigVariables();
ConnectorFuzzer(msg, make_security_connector, make_connector).Run();
}
} // namespace grpc_core

@ -0,0 +1,34 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_END2END_FUZZERS_CONNECTOR_FUZZER_H
#define GRPC_TEST_CORE_END2END_FUZZERS_CONNECTOR_FUZZER_H
#include "absl/functional/function_ref.h"
#include "src/core/client_channel/connector.h"
#include "src/core/lib/security/security_connector/security_connector.h"
#include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
namespace grpc_core {
void RunConnectorFuzzer(
const fuzzer_input::Msg& msg,
absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector>()>
make_security_connector,
absl::FunctionRef<OrphanablePtr<SubchannelConnector>()> make_connector);
}
#endif // GRPC_TEST_CORE_END2END_FUZZERS_CONNECTOR_FUZZER_H

@ -0,0 +1,30 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/credentials.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/end2end/fuzzers/connector_fuzzer.h"
DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) {
grpc_core::RunConnectorFuzzer(
msg,
[]() {
return grpc_core::RefCountedPtr<grpc_channel_security_connector>();
},
[]() { return grpc_core::MakeOrphanable<grpc_core::Chttp2Connector>(); });
}

@ -0,0 +1,36 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/credentials.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
#include "src/core/lib/security/security_connector/fake/fake_security_connector.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/end2end/fuzzers/connector_fuzzer.h"
DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) {
grpc_core::RunConnectorFuzzer(
msg,
[]() {
return grpc_fake_channel_security_connector_create(
grpc_core::RefCountedPtr<grpc_channel_credentials>(
grpc_fake_transport_security_credentials_create()),
nullptr, "foobar", grpc_core::ChannelArgs{});
},
[]() { return grpc_core::MakeOrphanable<grpc_core::Chttp2Connector>(); });
}

@ -172,6 +172,20 @@ message ChaoticGoodFrame {
message ChaoticGoodSettings {}
message FakeTransportFrame {
enum MessageString {
CLIENT_INIT = 0;
SERVER_INIT = 1;
CLIENT_FINISHED = 2;
SERVER_FINISHED = 3;
}
oneof payload {
bytes raw_bytes = 1;
MessageString message_string = 2;
}
}
message InputSegment {
int32 delay_ms = 1;
oneof payload {
@ -187,6 +201,7 @@ message InputSegment {
H2ClientPrefix client_prefix = 11;
uint32 repeated_zeros = 12;
ChaoticGoodFrame chaotic_good = 13;
FakeTransportFrame fake_transport_frame = 14;
}
}
@ -204,10 +219,18 @@ message NetworkInput {
}
}
// Only for connector fuzzer, when to drop the connector
message ShutdownConnector {
int32 delay_ms = 1;
int32 shutdown_status = 2;
string shutdown_message = 3;
}
message Msg {
repeated NetworkInput network_input = 1;
repeated api_fuzzer.Action api_actions = 2;
fuzzing_event_engine.Actions event_engine_actions = 3;
grpc.testing.FuzzConfigVars config_vars = 4;
grpc.testing.FuzzingChannelArgs channel_args = 5;
ShutdownConnector shutdown_connector = 6;
}

@ -267,6 +267,13 @@ SliceBuffer ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame& frame) {
return out;
}
void store32_little_endian(uint32_t value, unsigned char* buf) {
buf[3] = static_cast<unsigned char>((value >> 24) & 0xFF);
buf[2] = static_cast<unsigned char>((value >> 16) & 0xFF);
buf[1] = static_cast<unsigned char>((value >> 8) & 0xFF);
buf[0] = static_cast<unsigned char>((value) & 0xFF);
}
grpc_slice SliceFromSegment(const fuzzer_input::InputSegment& segment) {
switch (segment.payload_case()) {
case fuzzer_input::InputSegment::kRawBytes:
@ -333,6 +340,38 @@ grpc_slice SliceFromSegment(const fuzzer_input::InputSegment& segment) {
.JoinIntoSlice()
.TakeCSlice();
} break;
case fuzzer_input::InputSegment::kFakeTransportFrame: {
auto generate = [](absl::string_view payload) {
uint32_t length = payload.length();
std::vector<unsigned char> bytes;
bytes.resize(4);
store32_little_endian(length + 4, bytes.data());
for (auto c : payload) {
bytes.push_back(static_cast<unsigned char>(c));
}
return grpc_slice_from_copied_buffer(
reinterpret_cast<const char*>(bytes.data()), bytes.size());
};
switch (segment.fake_transport_frame().payload_case()) {
case fuzzer_input::FakeTransportFrame::kRawBytes:
return generate(segment.fake_transport_frame().raw_bytes());
case fuzzer_input::FakeTransportFrame::kMessageString:
switch (segment.fake_transport_frame().message_string()) {
default:
return generate("UNKNOWN");
case fuzzer_input::FakeTransportFrame::CLIENT_INIT:
return generate("CLIENT_INIT");
case fuzzer_input::FakeTransportFrame::SERVER_INIT:
return generate("SERVER_INIT");
case fuzzer_input::FakeTransportFrame::CLIENT_FINISHED:
return generate("CLIENT_FINISHED");
case fuzzer_input::FakeTransportFrame::SERVER_FINISHED:
return generate("SERVER_FINISHED");
}
case fuzzer_input::FakeTransportFrame::PAYLOAD_NOT_SET:
return generate("");
}
}
case fuzzer_input::InputSegment::PAYLOAD_NOT_SET:
break;
}
@ -545,4 +584,15 @@ Duration ScheduleConnection(
return delay;
}
void ScheduleWrites(
const fuzzer_input::NetworkInput& network_input,
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint,
grpc_event_engine::experimental::FuzzingEventEngine* event_engine) {
auto schedule = MakeSchedule(network_input);
auto ep = std::shared_ptr<EventEngine::Endpoint>(std::move(endpoint));
ReadForever(ep);
ScheduleWritesForReads(ep, event_engine, std::move(schedule));
}
} // namespace grpc_core

@ -30,6 +30,12 @@ Duration ScheduleReads(
mock_endpoint_controller,
grpc_event_engine::experimental::FuzzingEventEngine* event_engine);
void ScheduleWrites(
const fuzzer_input::NetworkInput& network_input,
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint,
grpc_event_engine::experimental::FuzzingEventEngine* event_engine);
Duration ScheduleConnection(
const fuzzer_input::NetworkInput& network_input,
grpc_event_engine::experimental::FuzzingEventEngine* event_engine,

@ -18,6 +18,7 @@
#include <stdlib.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <limits>
#include <vector>
@ -32,6 +33,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/dump_args.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/telemetry/stats.h"
@ -189,7 +191,15 @@ void FuzzingEventEngine::TickUntilIdle() {
while (true) {
{
grpc_core::MutexLock lock(&*mu_);
if (tasks_by_id_.empty()) return;
LOG_EVERY_N_SEC(INFO, 5)
<< "TickUntilIdle: "
<< GRPC_DUMP_ARGS(tasks_by_id_.size(), outstanding_reads_.load(),
outstanding_writes_.load());
if (tasks_by_id_.empty() &&
outstanding_writes_.load(std::memory_order_relaxed) == 0 &&
outstanding_reads_.load(std::memory_order_relaxed) == 0) {
return;
}
}
Tick();
}
@ -299,6 +309,9 @@ absl::Status FuzzingEventEngine::FuzzingListener::Start() {
bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) {
CHECK(!closed[index]);
const int peer_index = 1 - index;
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "WRITE[" << this << ":" << index << "]: entry "
<< GRPC_DUMP_ARGS(data->Length());
if (data->Length() == 0) return true;
size_t write_len = std::numeric_limits<size_t>::max();
// Check the write_sizes queue for fuzzer imposed restrictions on this write
@ -315,12 +328,16 @@ bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) {
// byte.
if (write_len == 0) write_len = 1;
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "WRITE[" << this << ":" << index << "]: " << write_len << " bytes";
<< "WRITE[" << this << ":" << index << "]: " << write_len << " bytes; "
<< GRPC_DUMP_ARGS(pending_read[peer_index].has_value());
// Expand the pending buffer.
size_t prev_len = pending[index].size();
pending[index].resize(prev_len + write_len);
// Move bytes from the to-write data into the pending buffer.
data->MoveFirstNBytesIntoBuffer(write_len, pending[index].data() + prev_len);
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "WRITE[" << this << ":" << index << "]: post-move "
<< GRPC_DUMP_ARGS(data->Length());
// If there was a pending read, then we can fulfill it.
if (pending_read[peer_index].has_value()) {
pending_read[peer_index]->buffer->Append(
@ -328,7 +345,11 @@ bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) {
pending[index].clear();
g_fuzzing_event_engine->RunLocked(
RunType::kWrite,
[cb = std::move(pending_read[peer_index]->on_read)]() mutable {
[cb = std::move(pending_read[peer_index]->on_read), this, peer_index,
buffer = pending_read[peer_index]->buffer]() mutable {
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "FINISH_READ[" << this << ":" << peer_index
<< "]: " << GRPC_DUMP_ARGS(buffer->Length());
cb(absl::OkStatus());
});
pending_read[peer_index].reset();
@ -339,6 +360,10 @@ bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) {
bool FuzzingEventEngine::FuzzingEndpoint::Write(
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
const WriteArgs*) {
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "START_WRITE[" << middle_.get() << ":" << my_index()
<< "]: " << data->Length() << " bytes";
IoToken write_token(&g_fuzzing_event_engine->outstanding_writes_);
grpc_core::global_stats().IncrementSyscallWrite();
grpc_core::MutexLock lock(&*mu_);
CHECK(!middle_->closed[my_index()]);
@ -346,24 +371,38 @@ bool FuzzingEventEngine::FuzzingEndpoint::Write(
// If the write succeeds immediately, then we return true.
if (middle_->Write(data, my_index())) return true;
middle_->writing[my_index()] = true;
ScheduleDelayedWrite(middle_, my_index(), std::move(on_writable), data);
ScheduleDelayedWrite(middle_, my_index(), std::move(on_writable), data,
std::move(write_token));
return false;
}
void FuzzingEventEngine::FuzzingEndpoint::ScheduleDelayedWrite(
std::shared_ptr<EndpointMiddle> middle, int index,
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data) {
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
IoToken write_token) {
g_fuzzing_event_engine->RunLocked(
RunType::kWrite, [middle = std::move(middle), index, data,
on_writable = std::move(on_writable)]() mutable {
RunType::kWrite,
[write_token = std::move(write_token), middle = std::move(middle), index,
data, on_writable = std::move(on_writable)]() mutable {
grpc_core::ReleasableMutexLock lock(&*mu_);
CHECK(middle->writing[index]);
if (middle->closed[index]) {
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "CLOSED[" << middle.get() << ":" << index << "]";
g_fuzzing_event_engine->RunLocked(
RunType::kRunAfter,
[on_writable = std::move(on_writable)]() mutable {
on_writable(absl::InternalError("Endpoint closed"));
});
if (middle->pending_read[1 - index].has_value()) {
g_fuzzing_event_engine->RunLocked(
RunType::kRunAfter,
[cb = std::move(
middle->pending_read[1 - index]->on_read)]() mutable {
cb(absl::InternalError("Endpoint closed"));
});
middle->pending_read[1 - index].reset();
}
return;
}
if (middle->Write(data, index)) {
@ -373,14 +412,23 @@ void FuzzingEventEngine::FuzzingEndpoint::ScheduleDelayedWrite(
return;
}
ScheduleDelayedWrite(std::move(middle), index, std::move(on_writable),
data);
data, std::move(write_token));
});
}
FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() {
grpc_core::MutexLock lock(&*mu_);
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "CLOSE[" << middle_.get() << ":" << my_index() << "]: "
<< GRPC_DUMP_ARGS(
middle_->closed[my_index()], middle_->closed[peer_index()],
middle_->pending_read[my_index()].has_value(),
middle_->pending_read[peer_index()].has_value(),
middle_->writing[my_index()], middle_->writing[peer_index()]);
middle_->closed[my_index()] = true;
if (middle_->pending_read[my_index()].has_value()) {
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "CLOSED_READING[" << middle_.get() << ":" << my_index() << "]";
g_fuzzing_event_engine->RunLocked(
RunType::kRunAfter,
[cb = std::move(middle_->pending_read[my_index()]->on_read)]() mutable {
@ -388,7 +436,7 @@ FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() {
});
middle_->pending_read[my_index()].reset();
}
if (!middle_->writing[peer_index()] &&
if (!middle_->writing[my_index()] &&
middle_->pending_read[peer_index()].has_value()) {
g_fuzzing_event_engine->RunLocked(
RunType::kRunAfter,
@ -403,20 +451,25 @@ FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() {
bool FuzzingEventEngine::FuzzingEndpoint::Read(
absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs*) {
GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
<< "START_READ[" << middle_.get() << ":" << my_index() << "]";
buffer->Clear();
IoToken read_token(&g_fuzzing_event_engine->outstanding_reads_);
grpc_core::MutexLock lock(&*mu_);
CHECK(!middle_->closed[my_index()]);
if (middle_->pending[peer_index()].empty()) {
// If the endpoint is closed, fail asynchronously.
if (middle_->closed[peer_index()]) {
g_fuzzing_event_engine->RunLocked(
RunType::kRunAfter, [on_read = std::move(on_read)]() mutable {
RunType::kRunAfter,
[read_token, on_read = std::move(on_read)]() mutable {
on_read(absl::InternalError("Endpoint closed"));
});
return false;
}
// If the endpoint has no pending data, then we need to wait for a write.
middle_->pending_read[my_index()] = PendingRead{std::move(on_read), buffer};
middle_->pending_read[my_index()] =
PendingRead{std::move(read_token), std::move(on_read), buffer};
return false;
} else {
// If the endpoint has pending data, then we can fulfill the read

@ -17,6 +17,7 @@
#include <stddef.h>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <map>
@ -124,6 +125,36 @@ class FuzzingEventEngine : public EventEngine {
}
private:
class IoToken {
public:
IoToken() : refs_(nullptr) {}
explicit IoToken(std::atomic<size_t>* refs) : refs_(refs) {
refs_->fetch_add(1, std::memory_order_relaxed);
}
~IoToken() {
if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed);
}
IoToken(const IoToken& other) : refs_(other.refs_) {
if (refs_ != nullptr) refs_->fetch_add(1, std::memory_order_relaxed);
}
IoToken& operator=(const IoToken& other) {
IoToken copy(other);
Swap(copy);
return *this;
}
IoToken(IoToken&& other) noexcept
: refs_(std::exchange(other.refs_, nullptr)) {}
IoToken& operator=(IoToken&& other) noexcept {
if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed);
refs_ = std::exchange(other.refs_, nullptr);
return *this;
}
void Swap(IoToken& other) { std::swap(refs_, other.refs_); }
private:
std::atomic<size_t>* refs_;
};
enum class RunType {
kWrite,
kRunAfter,
@ -183,6 +214,8 @@ class FuzzingEventEngine : public EventEngine {
// One read that's outstanding.
struct PendingRead {
// The associated io token
IoToken io_token;
// Callback to invoke when the read completes.
absl::AnyInvocable<void(absl::Status)> on_read;
// The buffer to read into.
@ -243,8 +276,8 @@ class FuzzingEventEngine : public EventEngine {
// endpoint shutdown, it's believed this is a legal implementation.
static void ScheduleDelayedWrite(
std::shared_ptr<EndpointMiddle> middle, int index,
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
IoToken write_token) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
const std::shared_ptr<EndpointMiddle> middle_;
const int index_;
};
@ -299,6 +332,8 @@ class FuzzingEventEngine : public EventEngine {
std::queue<std::queue<size_t>> write_sizes_for_future_connections_
ABSL_GUARDED_BY(mu_);
grpc_pick_port_functions previous_pick_port_functions_;
std::atomic<size_t> outstanding_writes_{0};
std::atomic<size_t> outstanding_reads_{0};
grpc_core::Mutex run_after_duration_callback_mu_;
absl::AnyInvocable<void(Duration)> run_after_duration_callback_

Loading…
Cancel
Save