Handle cancellation inside polling an activity (#27369)

* allow cancellation during run

* add a test

* review feedback
reviewable/pr27395/r1
Craig Tiller 4 years ago committed by GitHub
parent 46257815d4
commit 4d7ad5271a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 55
      src/core/lib/promise/activity.h
  2. 50
      test/core/promise/activity_test.cc

@ -122,7 +122,9 @@ class Activity : private Wakeable {
// Wakeup the current threads activity - will force a subsequent poll after
// the one that's running.
static void WakeupCurrent() { current()->got_wakeup_during_run_ = true; }
static void WakeupCurrent() {
current()->SetActionDuringRun(ActionDuringRun::kWakeup);
}
// Return the current activity.
// Additionally:
@ -154,6 +156,15 @@ class Activity : private Wakeable {
Waker MakeNonOwningWaker() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
protected:
// Action received during a run, in priority order.
// If more than one action is received during a run, we use max() to resolve
// which one to report (so Cancel overrides Wakeup).
enum class ActionDuringRun : uint8_t {
kNone, // No action occured during run.
kWakeup, // A wakeup occured during run.
kCancel, // Cancel was called during run.
};
inline virtual ~Activity() {
if (handle_) {
DropHandle();
@ -170,8 +181,8 @@ class Activity : private Wakeable {
static bool have_current() { return g_current_activity_ != nullptr; }
// Check if we got an internal wakeup since the last time this function was
// called.
bool got_wakeup() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return absl::exchange(got_wakeup_during_run_, false);
ActionDuringRun GotActionDuringRun() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return absl::exchange(action_during_run_, ActionDuringRun::kNone);
}
// Set the current activity at construction, clean it up at destruction.
@ -190,6 +201,12 @@ class Activity : private Wakeable {
// completed.
void WakeupComplete() { Unref(); }
// Mark the current activity as being cancelled (so we can actually cancel it
// after polling).
void CancelCurrent() {
current()->SetActionDuringRun(ActionDuringRun::kCancel);
}
private:
class Handle;
@ -208,12 +225,20 @@ class Activity : private Wakeable {
bool RefIfNonzero();
// Drop the (proved existing) wait handle.
void DropHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Set the action that occured during this run.
// We use max to combine actions so that cancellation overrides wakeups.
void SetActionDuringRun(ActionDuringRun action)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
action_during_run_ = std::max(action_during_run_, action);
}
// Current refcount.
std::atomic<uint32_t> refs_{1};
// If wakeup is called during Promise polling, we raise this flag and repoll
// until things settle out.
bool got_wakeup_during_run_ ABSL_GUARDED_BY(mu_) = false;
// If wakeup is called during Promise polling, we set this to Wakeup and
// repoll. If cancel is called during Promise polling, we set this to Cancel
// and cancel at the end of polling.
ActionDuringRun action_during_run_ ABSL_GUARDED_BY(mu_) =
ActionDuringRun::kNone;
// Handle for long waits. Allows a very small weak pointer type object to
// queue for wakeups while Activity may be deleted earlier.
Handle* handle_ ABSL_GUARDED_BY(mu_) = nullptr;
@ -290,6 +315,10 @@ class PromiseActivity final
size_t Size() override { return sizeof(*this); }
void Cancel() final {
if (Activity::is_current()) {
CancelCurrent();
return;
}
bool was_done;
{
MutexLock lock(&mu_);
@ -377,7 +406,7 @@ class PromiseActivity final
// the promise.
absl::optional<absl::Status> StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
GPR_ASSERT(is_current());
do {
while (true) {
// Run the promise.
GPR_ASSERT(!done_);
auto r = promise_holder_.promise();
@ -387,8 +416,16 @@ class PromiseActivity final
return IntoStatus(status);
}
// Continue looping til no wakeups occur.
} while (got_wakeup());
return {};
switch (GotActionDuringRun()) {
case ActionDuringRun::kNone:
return {};
case ActionDuringRun::kWakeup:
break;
case ActionDuringRun::kCancel:
MarkDone();
return absl::CancelledError();
}
}
}
using Promise = typename Factory::Promise;

@ -278,6 +278,56 @@ TEST(ActivityTest, WithContext) {
EXPECT_TRUE(done);
}
TEST(ActivityTest, CanCancelDuringExecution) {
ActivityPtr activity;
StrictMock<MockFunction<void(absl::Status)>> on_done;
int run_count = 0;
activity = MakeActivity(
[&activity, &run_count]() -> Poll<absl::Status> {
++run_count;
switch (run_count) {
case 1:
return Pending{};
case 2:
activity.reset();
return Pending{};
default:
abort();
}
},
InlineCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
EXPECT_CALL(on_done, Call(absl::CancelledError()));
activity->ForceWakeup();
}
TEST(ActivityTest, CanCancelDuringSuccessfulExecution) {
ActivityPtr activity;
StrictMock<MockFunction<void(absl::Status)>> on_done;
int run_count = 0;
activity = MakeActivity(
[&activity, &run_count]() -> Poll<absl::Status> {
++run_count;
switch (run_count) {
case 1:
return Pending{};
case 2:
activity.reset();
return absl::OkStatus();
default:
abort();
}
},
InlineCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
EXPECT_CALL(on_done, Call(absl::OkStatus()));
activity->ForceWakeup();
}
TEST(WakerTest, CanWakeupEmptyWaker) {
// Empty wakers should not do anything upon wakeup.
Waker().Wakeup();

Loading…
Cancel
Save