From 4d7ad5271a33027d99499da13966683ee085d55d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 17 Sep 2021 13:27:52 -0700 Subject: [PATCH] Handle cancellation inside polling an activity (#27369) * allow cancellation during run * add a test * review feedback --- src/core/lib/promise/activity.h | 55 +++++++++++++++++++++++++----- test/core/promise/activity_test.cc | 50 +++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 9 deletions(-) diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h index 371491552de..50fadbd09fa 100644 --- a/src/core/lib/promise/activity.h +++ b/src/core/lib/promise/activity.h @@ -122,7 +122,9 @@ class Activity : private Wakeable { // Wakeup the current threads activity - will force a subsequent poll after // the one that's running. - static void WakeupCurrent() { current()->got_wakeup_during_run_ = true; } + static void WakeupCurrent() { + current()->SetActionDuringRun(ActionDuringRun::kWakeup); + } // Return the current activity. // Additionally: @@ -154,6 +156,15 @@ class Activity : private Wakeable { Waker MakeNonOwningWaker() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); protected: + // Action received during a run, in priority order. + // If more than one action is received during a run, we use max() to resolve + // which one to report (so Cancel overrides Wakeup). + enum class ActionDuringRun : uint8_t { + kNone, // No action occured during run. + kWakeup, // A wakeup occured during run. + kCancel, // Cancel was called during run. + }; + inline virtual ~Activity() { if (handle_) { DropHandle(); @@ -170,8 +181,8 @@ class Activity : private Wakeable { static bool have_current() { return g_current_activity_ != nullptr; } // Check if we got an internal wakeup since the last time this function was // called. - bool got_wakeup() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { - return absl::exchange(got_wakeup_during_run_, false); + ActionDuringRun GotActionDuringRun() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + return absl::exchange(action_during_run_, ActionDuringRun::kNone); } // Set the current activity at construction, clean it up at destruction. @@ -190,6 +201,12 @@ class Activity : private Wakeable { // completed. void WakeupComplete() { Unref(); } + // Mark the current activity as being cancelled (so we can actually cancel it + // after polling). + void CancelCurrent() { + current()->SetActionDuringRun(ActionDuringRun::kCancel); + } + private: class Handle; @@ -208,12 +225,20 @@ class Activity : private Wakeable { bool RefIfNonzero(); // Drop the (proved existing) wait handle. void DropHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + // Set the action that occured during this run. + // We use max to combine actions so that cancellation overrides wakeups. + void SetActionDuringRun(ActionDuringRun action) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + action_during_run_ = std::max(action_during_run_, action); + } // Current refcount. std::atomic refs_{1}; - // If wakeup is called during Promise polling, we raise this flag and repoll - // until things settle out. - bool got_wakeup_during_run_ ABSL_GUARDED_BY(mu_) = false; + // If wakeup is called during Promise polling, we set this to Wakeup and + // repoll. If cancel is called during Promise polling, we set this to Cancel + // and cancel at the end of polling. + ActionDuringRun action_during_run_ ABSL_GUARDED_BY(mu_) = + ActionDuringRun::kNone; // Handle for long waits. Allows a very small weak pointer type object to // queue for wakeups while Activity may be deleted earlier. Handle* handle_ ABSL_GUARDED_BY(mu_) = nullptr; @@ -290,6 +315,10 @@ class PromiseActivity final size_t Size() override { return sizeof(*this); } void Cancel() final { + if (Activity::is_current()) { + CancelCurrent(); + return; + } bool was_done; { MutexLock lock(&mu_); @@ -377,7 +406,7 @@ class PromiseActivity final // the promise. absl::optional StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { GPR_ASSERT(is_current()); - do { + while (true) { // Run the promise. GPR_ASSERT(!done_); auto r = promise_holder_.promise(); @@ -387,8 +416,16 @@ class PromiseActivity final return IntoStatus(status); } // Continue looping til no wakeups occur. - } while (got_wakeup()); - return {}; + switch (GotActionDuringRun()) { + case ActionDuringRun::kNone: + return {}; + case ActionDuringRun::kWakeup: + break; + case ActionDuringRun::kCancel: + MarkDone(); + return absl::CancelledError(); + } + } } using Promise = typename Factory::Promise; diff --git a/test/core/promise/activity_test.cc b/test/core/promise/activity_test.cc index c5ea3542c0f..193e7364ac2 100644 --- a/test/core/promise/activity_test.cc +++ b/test/core/promise/activity_test.cc @@ -278,6 +278,56 @@ TEST(ActivityTest, WithContext) { EXPECT_TRUE(done); } +TEST(ActivityTest, CanCancelDuringExecution) { + ActivityPtr activity; + StrictMock> on_done; + int run_count = 0; + + activity = MakeActivity( + [&activity, &run_count]() -> Poll { + ++run_count; + switch (run_count) { + case 1: + return Pending{}; + case 2: + activity.reset(); + return Pending{}; + default: + abort(); + } + }, + InlineCallbackScheduler(), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); + + EXPECT_CALL(on_done, Call(absl::CancelledError())); + activity->ForceWakeup(); +} + +TEST(ActivityTest, CanCancelDuringSuccessfulExecution) { + ActivityPtr activity; + StrictMock> on_done; + int run_count = 0; + + activity = MakeActivity( + [&activity, &run_count]() -> Poll { + ++run_count; + switch (run_count) { + case 1: + return Pending{}; + case 2: + activity.reset(); + return absl::OkStatus(); + default: + abort(); + } + }, + InlineCallbackScheduler(), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); + + EXPECT_CALL(on_done, Call(absl::OkStatus())); + activity->ForceWakeup(); +} + TEST(WakerTest, CanWakeupEmptyWaker) { // Empty wakers should not do anything upon wakeup. Waker().Wakeup();