diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h index 98b42841820..b1d62f41419 100644 --- a/src/core/lib/promise/activity.h +++ b/src/core/lib/promise/activity.h @@ -364,6 +364,7 @@ class PromiseActivity final // Can't safely run, so ask to run later. wakeup_scheduler_.ScheduleWakeup(this); } else { + // Already a wakeup scheduled for later, drop ref. WakeupComplete(); } } diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD index 972fc677785..838f21ad1b1 100644 --- a/test/core/promise/BUILD +++ b/test/core/promise/BUILD @@ -278,6 +278,12 @@ grpc_proto_fuzzer( tags = ["no_windows"], uses_polling = False, deps = [ + "//:activity", + "//:join", + "//:map", + "//:promise", + "//:race", + "//:seq", "//test/core/util:grpc_test_util", ], ) diff --git a/test/core/promise/promise_fuzzer.cc b/test/core/promise/promise_fuzzer.cc index ac12411bf99..78a0da9e3a8 100644 --- a/test/core/promise/promise_fuzzer.cc +++ b/test/core/promise/promise_fuzzer.cc @@ -12,10 +12,306 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/join.h" +#include "src/core/lib/promise/map.h" +#include "src/core/lib/promise/promise.h" +#include "src/core/lib/promise/race.h" +#include "src/core/lib/promise/seq.h" #include "src/libfuzzer/libfuzzer_macro.h" #include "test/core/promise/promise_fuzzer.pb.h" bool squelch = true; bool leak_check = true; -DEFINE_PROTO_FUZZER(const promise_fuzzer::Msg&) {} +namespace grpc_core { +// Return type for infallible promises. +// We choose this so that it's easy to construct, and will trigger asan failures +// if misused, and is copyable. +using IntHdl = std::shared_ptr; + +template +using PromiseFactory = std::function(T)>; + +namespace { +class Fuzzer { + public: + void Run(const promise_fuzzer::Msg& msg) { + // If there's no promise we can't construct and activity and... we're done. + if (!msg.has_promise()) { + return; + } + // Construct activity. + activity_ = MakeActivity( + [msg, this] { + return Seq(MakePromise(msg.promise()), + [] { return absl::OkStatus(); }); + }, + Scheduler{this}, + [this](absl::Status status) { + // Must only be called once + GPR_ASSERT(!done_); + // If we became certain of the eventual status, verify it. + if (expected_status_.has_value()) { + GPR_ASSERT(status == *expected_status_); + } + // Mark ourselves done. + done_ = true; + }); + for (int i = 0; !done_ && activity_ != nullptr && i < msg.actions_size(); + i++) { + // Do some things + const auto& action = msg.actions(i); + switch (action.action_type_case()) { + // Force a wakeup + case promise_fuzzer::Action::kForceWakeup: + activity_->ForceWakeup(); + break; + // Cancel from the outside + case promise_fuzzer::Action::kCancel: + ExpectCancelled(); + activity_.reset(); + break; + // Flush any pending wakeups + case promise_fuzzer::Action::kFlushWakeup: + if (wakeup_ != nullptr) absl::exchange(wakeup_, nullptr)(); + break; + // Drop some wakeups (external system closed?) + case promise_fuzzer::Action::kDropWaker: { + int n = action.drop_waker(); + auto v = std::move(wakers_[n]); + wakers_.erase(n); + break; + } + // Wakeup some wakeups + case promise_fuzzer::Action::kAwakeWaker: { + int n = action.awake_waker(); + auto v = std::move(wakers_[n]); + wakers_.erase(n); + for (auto& w : v) { + w.Wakeup(); + } + break; + } + case promise_fuzzer::Action::ACTION_TYPE_NOT_SET: + break; + } + } + ExpectCancelled(); + activity_.reset(); + if (wakeup_ != nullptr) absl::exchange(wakeup_, nullptr)(); + GPR_ASSERT(done_); + } + + private: + // Schedule wakeups against the fuzzer + struct Scheduler { + Fuzzer* fuzzer; + // Schedule a wakeup + template + void ScheduleWakeup(ActivityType* activity) { + GPR_ASSERT(activity == fuzzer->activity_.get()); + GPR_ASSERT(fuzzer->wakeup_ == nullptr); + fuzzer->wakeup_ = [activity]() { activity->RunScheduledWakeup(); }; + } + }; + + // We know that if not already finished, the status when finished will be + // cancelled. + void ExpectCancelled() { + if (!done_ && !expected_status_.has_value()) { + expected_status_ = absl::CancelledError(); + } + } + + // Construct a promise factory from a protobuf + PromiseFactory MakePromiseFactory( + const promise_fuzzer::PromiseFactory& p) { + switch (p.promise_factory_type_case()) { + case promise_fuzzer::PromiseFactory::kPromise: + return [p, this](IntHdl) { return MakePromise(p.promise()); }; + case promise_fuzzer::PromiseFactory::kLast: + return [](IntHdl h) { return [h]() { return h; }; }; + case promise_fuzzer::PromiseFactory::PROMISE_FACTORY_TYPE_NOT_SET: + break; + } + return [](IntHdl) { + return []() -> Poll { return std::make_shared(42); }; + }; + } + + // Construct a promise from a protobuf + Promise MakePromise(const promise_fuzzer::Promise& p) { + switch (p.promise_type_case()) { + case promise_fuzzer::Promise::kSeq: + switch (p.seq().promise_factories_size()) { + case 1: + return Seq(MakePromise(p.seq().first()), + MakePromiseFactory(p.seq().promise_factories(0))); + case 2: + return Seq(MakePromise(p.seq().first()), + MakePromiseFactory(p.seq().promise_factories(0)), + MakePromiseFactory(p.seq().promise_factories(1))); + case 3: + return Seq(MakePromise(p.seq().first()), + MakePromiseFactory(p.seq().promise_factories(0)), + MakePromiseFactory(p.seq().promise_factories(1)), + MakePromiseFactory(p.seq().promise_factories(2))); + case 4: + return Seq(MakePromise(p.seq().first()), + MakePromiseFactory(p.seq().promise_factories(0)), + MakePromiseFactory(p.seq().promise_factories(1)), + MakePromiseFactory(p.seq().promise_factories(2)), + MakePromiseFactory(p.seq().promise_factories(3))); + case 5: + return Seq(MakePromise(p.seq().first()), + MakePromiseFactory(p.seq().promise_factories(0)), + MakePromiseFactory(p.seq().promise_factories(1)), + MakePromiseFactory(p.seq().promise_factories(2)), + MakePromiseFactory(p.seq().promise_factories(3)), + MakePromiseFactory(p.seq().promise_factories(4))); + case 6: + return Seq(MakePromise(p.seq().first()), + MakePromiseFactory(p.seq().promise_factories(0)), + MakePromiseFactory(p.seq().promise_factories(1)), + MakePromiseFactory(p.seq().promise_factories(2)), + MakePromiseFactory(p.seq().promise_factories(3)), + MakePromiseFactory(p.seq().promise_factories(4)), + MakePromiseFactory(p.seq().promise_factories(5))); + } + break; + case promise_fuzzer::Promise::kJoin: + switch (p.join().promises_size()) { + case 1: + return Map(Join(MakePromise(p.join().promises(0))), + [](std::tuple t) { return std::get<0>(t); }); + case 2: + return Map( + Join(MakePromise(p.join().promises(0)), + MakePromise(p.join().promises(1))), + [](std::tuple t) { return std::get<0>(t); }); + case 3: + return Map(Join(MakePromise(p.join().promises(0)), + MakePromise(p.join().promises(1)), + MakePromise(p.join().promises(2))), + [](std::tuple t) { + return std::get<0>(t); + }); + case 4: + return Map(Join(MakePromise(p.join().promises(0)), + MakePromise(p.join().promises(1)), + MakePromise(p.join().promises(2)), + MakePromise(p.join().promises(3))), + [](std::tuple t) { + return std::get<0>(t); + }); + case 5: + return Map( + Join(MakePromise(p.join().promises(0)), + MakePromise(p.join().promises(1)), + MakePromise(p.join().promises(2)), + MakePromise(p.join().promises(3)), + MakePromise(p.join().promises(4))), + [](std::tuple t) { + return std::get<0>(t); + }); + case 6: + return Map( + Join(MakePromise(p.join().promises(0)), + MakePromise(p.join().promises(1)), + MakePromise(p.join().promises(2)), + MakePromise(p.join().promises(3)), + MakePromise(p.join().promises(4)), + MakePromise(p.join().promises(5))), + [](std::tuple + t) { return std::get<0>(t); }); + } + break; + case promise_fuzzer::Promise::kRace: + switch (p.race().promises_size()) { + case 1: + return Race(MakePromise(p.race().promises(0))); + case 2: + return Race(MakePromise(p.race().promises(0)), + MakePromise(p.race().promises(1))); + case 3: + return Race(MakePromise(p.race().promises(0)), + MakePromise(p.race().promises(1)), + MakePromise(p.race().promises(2))); + case 4: + return Race(MakePromise(p.race().promises(0)), + MakePromise(p.race().promises(1)), + MakePromise(p.race().promises(2)), + MakePromise(p.race().promises(3))); + case 5: + return Race(MakePromise(p.race().promises(0)), + MakePromise(p.race().promises(1)), + MakePromise(p.race().promises(2)), + MakePromise(p.race().promises(3)), + MakePromise(p.race().promises(4))); + case 6: + return Race(MakePromise(p.race().promises(0)), + MakePromise(p.race().promises(1)), + MakePromise(p.race().promises(2)), + MakePromise(p.race().promises(3)), + MakePromise(p.race().promises(4)), + MakePromise(p.race().promises(5))); + } + break; + case promise_fuzzer::Promise::kNever: + return Never(); + case promise_fuzzer::Promise::kSleepFirstN: { + int n = p.sleep_first_n(); + return [n]() mutable -> Poll { + if (n <= 0) return std::make_shared(0); + n--; + return Pending{}; + }; + } + case promise_fuzzer::Promise::kCancelFromInside: + return [this]() -> Poll { + this->activity_.reset(); + return Pending{}; + }; + case promise_fuzzer::Promise::kWaitOnceOnWaker: { + bool called = false; + auto config = p.wait_once_on_waker(); + return [this, config, called]() mutable -> Poll { + if (!called) { + if (config.owning()) { + wakers_[config.waker()].push_back( + Activity::current()->MakeOwningWaker()); + } else { + wakers_[config.waker()].push_back( + Activity::current()->MakeNonOwningWaker()); + } + return Pending(); + } + return std::make_shared(3); + }; + } + case promise_fuzzer::Promise::PromiseTypeCase::PROMISE_TYPE_NOT_SET: + break; + } + return [] { return std::make_shared(42); }; + } + + // Activity under test + ActivityPtr activity_; + // Scheduled wakeup (may be nullptr if no wakeup scheduled) + std::function wakeup_; + // If we are certain of the final status, then that. Otherwise, nullopt if we + // don't know. + absl::optional expected_status_; + // Has on_done been called? + bool done_ = false; + // Wakers that may be scheduled + std::map> wakers_; +}; +} // namespace + +} // namespace grpc_core + +DEFINE_PROTO_FUZZER(const promise_fuzzer::Msg& msg) { + grpc_core::Fuzzer().Run(msg); +} diff --git a/test/core/promise/promise_fuzzer.proto b/test/core/promise/promise_fuzzer.proto index 90a72582685..3b236d8df5f 100644 --- a/test/core/promise/promise_fuzzer.proto +++ b/test/core/promise/promise_fuzzer.proto @@ -16,5 +16,76 @@ syntax = "proto3"; package promise_fuzzer; +message Seq { + Promise first = 1; + repeated PromiseFactory promise_factories = 2; +} + +message Join { + repeated Promise promises = 1; +} + +message Race { + repeated Promise promises = 1; +} + +message Last {} + +message PromiseFactory { + oneof promise_factory_type { + // Return a specific promise + Promise promise = 1; + // Return the result of the last thing + Last last = 2; + } +} + +message Never {} + +message ScheduleWaker { + bool owning = 1; + int32 waker = 2; +} + +message Promise { + oneof promise_type { + // Seq combinator + Seq seq = 1; + // Join combinator + Join join = 2; + // Race combinator + Race race = 3; + // Never complete + Never never = 4; + // Sleep n times, then wakeup + int32 sleep_first_n = 5; + // Cancel and be pending + Cancel cancel_from_inside = 6; + // Wait for waker n, then continue + ScheduleWaker wait_once_on_waker = 7; + } +} + +message Cancel {} + +message Wakeup {} + +message Action { + oneof action_type { + // Activity::ForceWakeup + Wakeup force_wakeup = 1; + // Cancel the activity + Cancel cancel = 2; + // Flush any pending scheduled wakeups + Wakeup flush_wakeup = 3; + // Awake waker n if it exists + int32 awake_waker = 4; + // Drop waker n if it exists + int32 drop_waker = 5; + } +} + message Msg { + Promise promise = 1; + repeated Action actions = 2; } diff --git a/test/core/promise/promise_fuzzer_corpus/crash-4758aec0c3e382d4fb3e398d634d8da72ed0d574 b/test/core/promise/promise_fuzzer_corpus/crash-4758aec0c3e382d4fb3e398d634d8da72ed0d574 new file mode 100644 index 00000000000..9b9d8c0f473 --- /dev/null +++ b/test/core/promise/promise_fuzzer_corpus/crash-4758aec0c3e382d4fb3e398d634d8da72ed0d574 @@ -0,0 +1,20 @@ +promise { + race { + promises { + cancel_from_inside { + } + } + promises { + sleep_first_n: 0 + } + promises { + sleep_first_n: 2097151999 + } + promises { + } + } +} +actions { + flush_wakeup { + } +} diff --git a/test/core/promise/promise_fuzzer_corpus/leak-90b749c0ca79c7a11d25f4b6f0594af99fe4a78f b/test/core/promise/promise_fuzzer_corpus/leak-90b749c0ca79c7a11d25f4b6f0594af99fe4a78f new file mode 100644 index 00000000000..623431d4c6a --- /dev/null +++ b/test/core/promise/promise_fuzzer_corpus/leak-90b749c0ca79c7a11d25f4b6f0594af99fe4a78f @@ -0,0 +1,12 @@ +promise { + never { + } +} +actions { + force_wakeup { + } +} +actions { + force_wakeup { + } +}