diff --git a/src/core/lib/iomgr/polling_entity.cc b/src/core/lib/iomgr/polling_entity.cc index 42233112c15..8b225444c7e 100644 --- a/src/core/lib/iomgr/polling_entity.cc +++ b/src/core/lib/iomgr/polling_entity.cc @@ -30,8 +30,12 @@ grpc_polling_entity grpc_polling_entity_create_from_pollset_set( grpc_pollset_set* pollset_set) { grpc_polling_entity pollent; - pollent.pollent.pollset_set = pollset_set; - pollent.tag = GRPC_POLLS_POLLSET_SET; + if (pollset_set == nullptr) { + pollent.tag = GRPC_POLLS_NONE; + } else { + pollent.pollent.pollset_set = pollset_set; + pollent.tag = GRPC_POLLS_POLLSET_SET; + } return pollent; } @@ -73,6 +77,8 @@ void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent, } else if (pollent->tag == GRPC_POLLS_POLLSET_SET) { CHECK_NE(pollent->pollent.pollset_set, nullptr); grpc_pollset_set_add_pollset_set(pss_dst, pollent->pollent.pollset_set); + } else if (pollent->tag == GRPC_POLLS_NONE) { + // Do nothing. } else { grpc_core::Crash( absl::StrFormat("Invalid grpc_polling_entity tag '%d'", pollent->tag)); @@ -93,6 +99,8 @@ void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity* pollent, } else if (pollent->tag == GRPC_POLLS_POLLSET_SET) { CHECK_NE(pollent->pollent.pollset_set, nullptr); grpc_pollset_set_del_pollset_set(pss_dst, pollent->pollent.pollset_set); + } else if (pollent->tag == GRPC_POLLS_NONE) { + // Do nothing. } else { grpc_core::Crash( absl::StrFormat("Invalid grpc_polling_entity tag '%d'", pollent->tag)); diff --git a/test/core/end2end/fuzzers/BUILD b/test/core/end2end/fuzzers/BUILD index 8848569e8ab..a0ed90bc804 100644 --- a/test/core/end2end/fuzzers/BUILD +++ b/test/core/end2end/fuzzers/BUILD @@ -206,3 +206,60 @@ grpc_proto_fuzzer( "//src/core:chaotic_good_server", ], ) + +grpc_cc_library( + name = "connector_fuzzer", + srcs = ["connector_fuzzer.cc"], + hdrs = ["connector_fuzzer.h"], + external_deps = ["absl/log:check"], + deps = [ + "fuzzer_input_proto", + "fuzzing_common", + "network_input", + "//:gpr", + "//:grpc", + "//src/core:channel_args", + "//test/core/event_engine/fuzzing_event_engine", + "//test/core/test_util:fuzz_config_vars", + "//test/core/test_util:grpc_test_util", + "//test/core/test_util:grpc_test_util_base", + ], +) + +grpc_proto_fuzzer( + name = "connector_fuzzer_chttp2", + srcs = ["connector_fuzzer_chttp2.cc"], + corpus = "connector_fuzzer_chttp2_corpus", + end2end_fuzzer = True, + language = "C++", + proto = None, + tags = [ + "no_mac", + "no_windows", + ], + uses_event_engine = False, + uses_polling = False, + deps = [ + ":connector_fuzzer", + "//:grpc", + ], +) + +grpc_proto_fuzzer( + name = "connector_fuzzer_chttp2_fakesec", + srcs = ["connector_fuzzer_chttp2_fakesec.cc"], + corpus = "connector_fuzzer_chttp2_fakesec_corpus", + end2end_fuzzer = True, + language = "C++", + proto = None, + tags = [ + "no_mac", + "no_windows", + ], + uses_event_engine = False, + uses_polling = False, + deps = [ + ":connector_fuzzer", + "//:grpc", + ], +) diff --git a/test/core/end2end/fuzzers/connector_fuzzer.cc b/test/core/end2end/fuzzers/connector_fuzzer.cc new file mode 100644 index 00000000000..5c230c49297 --- /dev/null +++ b/test/core/end2end/fuzzers/connector_fuzzer.cc @@ -0,0 +1,189 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/end2end/fuzzers/connector_fuzzer.h" + +#include "src/core/lib/address_utils/parse_address.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/gprpp/env.h" +#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/timer_manager.h" +#include "test/core/end2end/fuzzers/fuzzer_input.pb.h" +#include "test/core/end2end/fuzzers/network_input.h" +#include "test/core/test_util/fuzz_config_vars.h" +#include "test/core/test_util/test_config.h" + +bool squelch = true; +bool leak_check = true; + +using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::FuzzingEventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; +using ::grpc_event_engine::experimental::MockEndpointController; +using ::grpc_event_engine::experimental::SetEventEngineFactory; +using ::grpc_event_engine::experimental::URIToResolvedAddress; + +namespace grpc_core { +namespace { + +class ConnectorFuzzer { + public: + ConnectorFuzzer( + const fuzzer_input::Msg& msg, + absl::FunctionRef()> + make_security_connector, + absl::FunctionRef()> make_connector) + : make_security_connector_(make_security_connector), + engine_([actions = msg.event_engine_actions()]() { + SetEventEngineFactory([actions]() -> std::unique_ptr { + return std::make_unique( + FuzzingEventEngine::Options(), actions); + }); + return std::dynamic_pointer_cast( + GetDefaultEventEngine()); + }()), + mock_endpoint_controller_(MockEndpointController::Create(engine_)), + connector_(make_connector()) { + CHECK(engine_); + for (const auto& input : msg.network_input()) { + network_inputs_.push(input); + } + grpc_timer_manager_set_start_threaded(false); + grpc_init(); + ExecCtx exec_ctx; + Executor::SetThreadingAll(false); + listener_ = + engine_ + ->CreateListener( + [this](std::unique_ptr endpoint, + MemoryAllocator) { + if (network_inputs_.empty()) return; + ScheduleWrites(network_inputs_.front(), std::move(endpoint), + engine_.get()); + network_inputs_.pop(); + }, + [](absl::Status) {}, ChannelArgsEndpointConfig(ChannelArgs{}), + std::make_unique("foo")) + .value(); + if (msg.has_shutdown_connector() && + msg.shutdown_connector().delay_ms() > 0) { + auto shutdown_connector = msg.shutdown_connector(); + const auto delay = Duration::Milliseconds(shutdown_connector.delay_ms()); + engine_->RunAfterExactly(delay, [this, shutdown_connector = std::move( + shutdown_connector)]() { + if (connector_ == nullptr) return; + connector_->Shutdown(absl::Status( + static_cast(shutdown_connector.shutdown_status()), + shutdown_connector.shutdown_message())); + }); + } + // Abbreviated runtime for interpreting API actions, since we simply don't + // support many here. + uint64_t when_ms = 0; + for (const auto& action : msg.api_actions()) { + switch (action.type_case()) { + default: + break; + case api_fuzzer::Action::kSleepMs: + when_ms += action.sleep_ms(); + break; + case api_fuzzer::Action::kResizeResourceQuota: + engine_->RunAfterExactly( + Duration::Milliseconds(when_ms), + [this, new_size = action.resize_resource_quota()]() { + resource_quota_->memory_quota()->SetSize(new_size); + }); + when_ms += 1; + break; + } + } + } + + ~ConnectorFuzzer() { + listener_.reset(); + connector_.reset(); + mock_endpoint_controller_.reset(); + engine_->TickUntilIdle(); + grpc_shutdown_blocking(); + engine_->UnsetGlobalHooks(); + } + + void Run() { + grpc_resolved_address addr; + CHECK(grpc_parse_uri(URI::Parse("ipv4:127.0.0.1:1234").value(), &addr)); + CHECK_OK( + listener_->Bind(URIToResolvedAddress("ipv4:127.0.0.1:1234").value())); + CHECK_OK(listener_->Start()); + OrphanablePtr endpoint( + mock_endpoint_controller_->TakeCEndpoint()); + SubchannelConnector::Result result; + bool done = false; + auto channel_args = ChannelArgs{}.SetObject(engine_).SetObject( + resource_quota_); + auto security_connector = make_security_connector_(); + if (security_connector != nullptr) { + channel_args = channel_args.SetObject(std::move(security_connector)); + } + connector_->Connect( + SubchannelConnector::Args{&addr, nullptr, + Timestamp::Now() + Duration::Seconds(20), + channel_args}, + &result, NewClosure([&done, &result](grpc_error_handle status) { + done = true; + if (status.ok()) result.transport->Orphan(); + })); + + while (!done) { + engine_->Tick(); + grpc_timer_manager_tick(); + } + } + + private: + RefCountedPtr resource_quota_ = + MakeRefCounted("fuzzer"); + absl::FunctionRef()> + make_security_connector_; + std::shared_ptr engine_; + std::queue network_inputs_; + std::shared_ptr mock_endpoint_controller_; + std::unique_ptr listener_; + OrphanablePtr connector_; +}; + +} // namespace + +void RunConnectorFuzzer( + const fuzzer_input::Msg& msg, + absl::FunctionRef()> + make_security_connector, + absl::FunctionRef()> make_connector) { + if (squelch && !GetEnv("GRPC_TRACE_FUZZER").has_value()) { + grpc_disable_all_absl_logs(); + } + static const int once = []() { + ForceEnableExperiment("event_engine_client", true); + ForceEnableExperiment("event_engine_listener", true); + return 42; + }(); + CHECK_EQ(once, 42); // avoid unused variable warning + ApplyFuzzConfigVars(msg.config_vars()); + TestOnlyReloadExperimentsFromConfigVariables(); + ConnectorFuzzer(msg, make_security_connector, make_connector).Run(); +} + +} // namespace grpc_core diff --git a/test/core/end2end/fuzzers/connector_fuzzer.h b/test/core/end2end/fuzzers/connector_fuzzer.h new file mode 100644 index 00000000000..64b78aeb0bf --- /dev/null +++ b/test/core/end2end/fuzzers/connector_fuzzer.h @@ -0,0 +1,34 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_TEST_CORE_END2END_FUZZERS_CONNECTOR_FUZZER_H +#define GRPC_TEST_CORE_END2END_FUZZERS_CONNECTOR_FUZZER_H + +#include "absl/functional/function_ref.h" + +#include "src/core/client_channel/connector.h" +#include "src/core/lib/security/security_connector/security_connector.h" +#include "test/core/end2end/fuzzers/fuzzer_input.pb.h" + +namespace grpc_core { + +void RunConnectorFuzzer( + const fuzzer_input::Msg& msg, + absl::FunctionRef()> + make_security_connector, + absl::FunctionRef()> make_connector); + +} + +#endif // GRPC_TEST_CORE_END2END_FUZZERS_CONNECTOR_FUZZER_H diff --git a/test/core/end2end/fuzzers/connector_fuzzer_chttp2.cc b/test/core/end2end/fuzzers/connector_fuzzer_chttp2.cc new file mode 100644 index 00000000000..4c3e531189f --- /dev/null +++ b/test/core/end2end/fuzzers/connector_fuzzer_chttp2.cc @@ -0,0 +1,30 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "src/core/ext/transport/chttp2/client/chttp2_connector.h" +#include "src/libfuzzer/libfuzzer_macro.h" +#include "test/core/end2end/fuzzers/connector_fuzzer.h" + +DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) { + grpc_core::RunConnectorFuzzer( + msg, + []() { + return grpc_core::RefCountedPtr(); + }, + []() { return grpc_core::MakeOrphanable(); }); +} diff --git a/test/core/end2end/fuzzers/connector_fuzzer_chttp2_corpus/empty b/test/core/end2end/fuzzers/connector_fuzzer_chttp2_corpus/empty new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/test/core/end2end/fuzzers/connector_fuzzer_chttp2_corpus/empty @@ -0,0 +1 @@ + diff --git a/test/core/end2end/fuzzers/connector_fuzzer_chttp2_fakesec.cc b/test/core/end2end/fuzzers/connector_fuzzer_chttp2_fakesec.cc new file mode 100644 index 00000000000..aaccced6543 --- /dev/null +++ b/test/core/end2end/fuzzers/connector_fuzzer_chttp2_fakesec.cc @@ -0,0 +1,36 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "src/core/ext/transport/chttp2/client/chttp2_connector.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/security/credentials/fake/fake_credentials.h" +#include "src/core/lib/security/security_connector/fake/fake_security_connector.h" +#include "src/libfuzzer/libfuzzer_macro.h" +#include "test/core/end2end/fuzzers/connector_fuzzer.h" + +DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) { + grpc_core::RunConnectorFuzzer( + msg, + []() { + return grpc_fake_channel_security_connector_create( + grpc_core::RefCountedPtr( + grpc_fake_transport_security_credentials_create()), + nullptr, "foobar", grpc_core::ChannelArgs{}); + }, + []() { return grpc_core::MakeOrphanable(); }); +} diff --git a/test/core/end2end/fuzzers/connector_fuzzer_chttp2_fakesec_corpus/empty b/test/core/end2end/fuzzers/connector_fuzzer_chttp2_fakesec_corpus/empty new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/test/core/end2end/fuzzers/connector_fuzzer_chttp2_fakesec_corpus/empty @@ -0,0 +1 @@ + diff --git a/test/core/end2end/fuzzers/fuzzer_input.proto b/test/core/end2end/fuzzers/fuzzer_input.proto index 17d89f1e627..32b1c5d4436 100644 --- a/test/core/end2end/fuzzers/fuzzer_input.proto +++ b/test/core/end2end/fuzzers/fuzzer_input.proto @@ -172,6 +172,20 @@ message ChaoticGoodFrame { message ChaoticGoodSettings {} +message FakeTransportFrame { + enum MessageString { + CLIENT_INIT = 0; + SERVER_INIT = 1; + CLIENT_FINISHED = 2; + SERVER_FINISHED = 3; + } + + oneof payload { + bytes raw_bytes = 1; + MessageString message_string = 2; + } +} + message InputSegment { int32 delay_ms = 1; oneof payload { @@ -187,6 +201,7 @@ message InputSegment { H2ClientPrefix client_prefix = 11; uint32 repeated_zeros = 12; ChaoticGoodFrame chaotic_good = 13; + FakeTransportFrame fake_transport_frame = 14; } } @@ -204,10 +219,18 @@ message NetworkInput { } } +// Only for connector fuzzer, when to drop the connector +message ShutdownConnector { + int32 delay_ms = 1; + int32 shutdown_status = 2; + string shutdown_message = 3; +} + message Msg { repeated NetworkInput network_input = 1; repeated api_fuzzer.Action api_actions = 2; fuzzing_event_engine.Actions event_engine_actions = 3; grpc.testing.FuzzConfigVars config_vars = 4; grpc.testing.FuzzingChannelArgs channel_args = 5; + ShutdownConnector shutdown_connector = 6; } diff --git a/test/core/end2end/fuzzers/network_input.cc b/test/core/end2end/fuzzers/network_input.cc index 8aef97ea9c6..0afd8d1b95d 100644 --- a/test/core/end2end/fuzzers/network_input.cc +++ b/test/core/end2end/fuzzers/network_input.cc @@ -267,6 +267,13 @@ SliceBuffer ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame& frame) { return out; } +void store32_little_endian(uint32_t value, unsigned char* buf) { + buf[3] = static_cast((value >> 24) & 0xFF); + buf[2] = static_cast((value >> 16) & 0xFF); + buf[1] = static_cast((value >> 8) & 0xFF); + buf[0] = static_cast((value) & 0xFF); +} + grpc_slice SliceFromSegment(const fuzzer_input::InputSegment& segment) { switch (segment.payload_case()) { case fuzzer_input::InputSegment::kRawBytes: @@ -333,6 +340,38 @@ grpc_slice SliceFromSegment(const fuzzer_input::InputSegment& segment) { .JoinIntoSlice() .TakeCSlice(); } break; + case fuzzer_input::InputSegment::kFakeTransportFrame: { + auto generate = [](absl::string_view payload) { + uint32_t length = payload.length(); + std::vector bytes; + bytes.resize(4); + store32_little_endian(length + 4, bytes.data()); + for (auto c : payload) { + bytes.push_back(static_cast(c)); + } + return grpc_slice_from_copied_buffer( + reinterpret_cast(bytes.data()), bytes.size()); + }; + switch (segment.fake_transport_frame().payload_case()) { + case fuzzer_input::FakeTransportFrame::kRawBytes: + return generate(segment.fake_transport_frame().raw_bytes()); + case fuzzer_input::FakeTransportFrame::kMessageString: + switch (segment.fake_transport_frame().message_string()) { + default: + return generate("UNKNOWN"); + case fuzzer_input::FakeTransportFrame::CLIENT_INIT: + return generate("CLIENT_INIT"); + case fuzzer_input::FakeTransportFrame::SERVER_INIT: + return generate("SERVER_INIT"); + case fuzzer_input::FakeTransportFrame::CLIENT_FINISHED: + return generate("CLIENT_FINISHED"); + case fuzzer_input::FakeTransportFrame::SERVER_FINISHED: + return generate("SERVER_FINISHED"); + } + case fuzzer_input::FakeTransportFrame::PAYLOAD_NOT_SET: + return generate(""); + } + } case fuzzer_input::InputSegment::PAYLOAD_NOT_SET: break; } @@ -545,4 +584,15 @@ Duration ScheduleConnection( return delay; } +void ScheduleWrites( + const fuzzer_input::NetworkInput& network_input, + std::unique_ptr + endpoint, + grpc_event_engine::experimental::FuzzingEventEngine* event_engine) { + auto schedule = MakeSchedule(network_input); + auto ep = std::shared_ptr(std::move(endpoint)); + ReadForever(ep); + ScheduleWritesForReads(ep, event_engine, std::move(schedule)); +} + } // namespace grpc_core diff --git a/test/core/end2end/fuzzers/network_input.h b/test/core/end2end/fuzzers/network_input.h index a0e72d434f0..afb6490d81d 100644 --- a/test/core/end2end/fuzzers/network_input.h +++ b/test/core/end2end/fuzzers/network_input.h @@ -30,6 +30,12 @@ Duration ScheduleReads( mock_endpoint_controller, grpc_event_engine::experimental::FuzzingEventEngine* event_engine); +void ScheduleWrites( + const fuzzer_input::NetworkInput& network_input, + std::unique_ptr + endpoint, + grpc_event_engine::experimental::FuzzingEventEngine* event_engine); + Duration ScheduleConnection( const fuzzer_input::NetworkInput& network_input, grpc_event_engine::experimental::FuzzingEventEngine* event_engine, diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc index e67009b8a5b..4dec816f1e9 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -32,6 +33,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/gprpp/dump_args.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/port.h" #include "src/core/telemetry/stats.h" @@ -189,7 +191,15 @@ void FuzzingEventEngine::TickUntilIdle() { while (true) { { grpc_core::MutexLock lock(&*mu_); - if (tasks_by_id_.empty()) return; + LOG_EVERY_N_SEC(INFO, 5) + << "TickUntilIdle: " + << GRPC_DUMP_ARGS(tasks_by_id_.size(), outstanding_reads_.load(), + outstanding_writes_.load()); + if (tasks_by_id_.empty() && + outstanding_writes_.load(std::memory_order_relaxed) == 0 && + outstanding_reads_.load(std::memory_order_relaxed) == 0) { + return; + } } Tick(); } @@ -299,6 +309,9 @@ absl::Status FuzzingEventEngine::FuzzingListener::Start() { bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) { CHECK(!closed[index]); const int peer_index = 1 - index; + GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) + << "WRITE[" << this << ":" << index << "]: entry " + << GRPC_DUMP_ARGS(data->Length()); if (data->Length() == 0) return true; size_t write_len = std::numeric_limits::max(); // Check the write_sizes queue for fuzzer imposed restrictions on this write @@ -315,12 +328,16 @@ bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) { // byte. if (write_len == 0) write_len = 1; GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) - << "WRITE[" << this << ":" << index << "]: " << write_len << " bytes"; + << "WRITE[" << this << ":" << index << "]: " << write_len << " bytes; " + << GRPC_DUMP_ARGS(pending_read[peer_index].has_value()); // Expand the pending buffer. size_t prev_len = pending[index].size(); pending[index].resize(prev_len + write_len); // Move bytes from the to-write data into the pending buffer. data->MoveFirstNBytesIntoBuffer(write_len, pending[index].data() + prev_len); + GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) + << "WRITE[" << this << ":" << index << "]: post-move " + << GRPC_DUMP_ARGS(data->Length()); // If there was a pending read, then we can fulfill it. if (pending_read[peer_index].has_value()) { pending_read[peer_index]->buffer->Append( @@ -328,7 +345,11 @@ bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) { pending[index].clear(); g_fuzzing_event_engine->RunLocked( RunType::kWrite, - [cb = std::move(pending_read[peer_index]->on_read)]() mutable { + [cb = std::move(pending_read[peer_index]->on_read), this, peer_index, + buffer = pending_read[peer_index]->buffer]() mutable { + GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) + << "FINISH_READ[" << this << ":" << peer_index + << "]: " << GRPC_DUMP_ARGS(buffer->Length()); cb(absl::OkStatus()); }); pending_read[peer_index].reset(); @@ -339,6 +360,10 @@ bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) { bool FuzzingEventEngine::FuzzingEndpoint::Write( absl::AnyInvocable on_writable, SliceBuffer* data, const WriteArgs*) { + GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) + << "START_WRITE[" << middle_.get() << ":" << my_index() + << "]: " << data->Length() << " bytes"; + IoToken write_token(&g_fuzzing_event_engine->outstanding_writes_); grpc_core::global_stats().IncrementSyscallWrite(); grpc_core::MutexLock lock(&*mu_); CHECK(!middle_->closed[my_index()]); @@ -346,24 +371,38 @@ bool FuzzingEventEngine::FuzzingEndpoint::Write( // If the write succeeds immediately, then we return true. if (middle_->Write(data, my_index())) return true; middle_->writing[my_index()] = true; - ScheduleDelayedWrite(middle_, my_index(), std::move(on_writable), data); + ScheduleDelayedWrite(middle_, my_index(), std::move(on_writable), data, + std::move(write_token)); return false; } void FuzzingEventEngine::FuzzingEndpoint::ScheduleDelayedWrite( std::shared_ptr middle, int index, - absl::AnyInvocable on_writable, SliceBuffer* data) { + absl::AnyInvocable on_writable, SliceBuffer* data, + IoToken write_token) { g_fuzzing_event_engine->RunLocked( - RunType::kWrite, [middle = std::move(middle), index, data, - on_writable = std::move(on_writable)]() mutable { + RunType::kWrite, + [write_token = std::move(write_token), middle = std::move(middle), index, + data, on_writable = std::move(on_writable)]() mutable { grpc_core::ReleasableMutexLock lock(&*mu_); CHECK(middle->writing[index]); if (middle->closed[index]) { + GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) + << "CLOSED[" << middle.get() << ":" << index << "]"; g_fuzzing_event_engine->RunLocked( RunType::kRunAfter, [on_writable = std::move(on_writable)]() mutable { on_writable(absl::InternalError("Endpoint closed")); }); + if (middle->pending_read[1 - index].has_value()) { + g_fuzzing_event_engine->RunLocked( + RunType::kRunAfter, + [cb = std::move( + middle->pending_read[1 - index]->on_read)]() mutable { + cb(absl::InternalError("Endpoint closed")); + }); + middle->pending_read[1 - index].reset(); + } return; } if (middle->Write(data, index)) { @@ -373,14 +412,23 @@ void FuzzingEventEngine::FuzzingEndpoint::ScheduleDelayedWrite( return; } ScheduleDelayedWrite(std::move(middle), index, std::move(on_writable), - data); + data, std::move(write_token)); }); } FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() { grpc_core::MutexLock lock(&*mu_); + GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) + << "CLOSE[" << middle_.get() << ":" << my_index() << "]: " + << GRPC_DUMP_ARGS( + middle_->closed[my_index()], middle_->closed[peer_index()], + middle_->pending_read[my_index()].has_value(), + middle_->pending_read[peer_index()].has_value(), + middle_->writing[my_index()], middle_->writing[peer_index()]); middle_->closed[my_index()] = true; if (middle_->pending_read[my_index()].has_value()) { + GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) + << "CLOSED_READING[" << middle_.get() << ":" << my_index() << "]"; g_fuzzing_event_engine->RunLocked( RunType::kRunAfter, [cb = std::move(middle_->pending_read[my_index()]->on_read)]() mutable { @@ -388,7 +436,7 @@ FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() { }); middle_->pending_read[my_index()].reset(); } - if (!middle_->writing[peer_index()] && + if (!middle_->writing[my_index()] && middle_->pending_read[peer_index()].has_value()) { g_fuzzing_event_engine->RunLocked( RunType::kRunAfter, @@ -403,20 +451,25 @@ FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() { bool FuzzingEventEngine::FuzzingEndpoint::Read( absl::AnyInvocable on_read, SliceBuffer* buffer, const ReadArgs*) { + GRPC_TRACE_LOG(fuzzing_ee_writes, INFO) + << "START_READ[" << middle_.get() << ":" << my_index() << "]"; buffer->Clear(); + IoToken read_token(&g_fuzzing_event_engine->outstanding_reads_); grpc_core::MutexLock lock(&*mu_); CHECK(!middle_->closed[my_index()]); if (middle_->pending[peer_index()].empty()) { // If the endpoint is closed, fail asynchronously. if (middle_->closed[peer_index()]) { g_fuzzing_event_engine->RunLocked( - RunType::kRunAfter, [on_read = std::move(on_read)]() mutable { + RunType::kRunAfter, + [read_token, on_read = std::move(on_read)]() mutable { on_read(absl::InternalError("Endpoint closed")); }); return false; } // If the endpoint has no pending data, then we need to wait for a write. - middle_->pending_read[my_index()] = PendingRead{std::move(on_read), buffer}; + middle_->pending_read[my_index()] = + PendingRead{std::move(read_token), std::move(on_read), buffer}; return false; } else { // If the endpoint has pending data, then we can fulfill the read diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h index f28cdca5251..93aa42c3563 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -124,6 +125,36 @@ class FuzzingEventEngine : public EventEngine { } private: + class IoToken { + public: + IoToken() : refs_(nullptr) {} + explicit IoToken(std::atomic* refs) : refs_(refs) { + refs_->fetch_add(1, std::memory_order_relaxed); + } + ~IoToken() { + if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed); + } + IoToken(const IoToken& other) : refs_(other.refs_) { + if (refs_ != nullptr) refs_->fetch_add(1, std::memory_order_relaxed); + } + IoToken& operator=(const IoToken& other) { + IoToken copy(other); + Swap(copy); + return *this; + } + IoToken(IoToken&& other) noexcept + : refs_(std::exchange(other.refs_, nullptr)) {} + IoToken& operator=(IoToken&& other) noexcept { + if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed); + refs_ = std::exchange(other.refs_, nullptr); + return *this; + } + void Swap(IoToken& other) { std::swap(refs_, other.refs_); } + + private: + std::atomic* refs_; + }; + enum class RunType { kWrite, kRunAfter, @@ -183,6 +214,8 @@ class FuzzingEventEngine : public EventEngine { // One read that's outstanding. struct PendingRead { + // The associated io token + IoToken io_token; // Callback to invoke when the read completes. absl::AnyInvocable on_read; // The buffer to read into. @@ -243,8 +276,8 @@ class FuzzingEventEngine : public EventEngine { // endpoint shutdown, it's believed this is a legal implementation. static void ScheduleDelayedWrite( std::shared_ptr middle, int index, - absl::AnyInvocable on_writable, SliceBuffer* data) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + absl::AnyInvocable on_writable, SliceBuffer* data, + IoToken write_token) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); const std::shared_ptr middle_; const int index_; }; @@ -299,6 +332,8 @@ class FuzzingEventEngine : public EventEngine { std::queue> write_sizes_for_future_connections_ ABSL_GUARDED_BY(mu_); grpc_pick_port_functions previous_pick_port_functions_; + std::atomic outstanding_writes_{0}; + std::atomic outstanding_reads_{0}; grpc_core::Mutex run_after_duration_callback_mu_; absl::AnyInvocable run_after_duration_callback_