Promise Activities (#26921)

* promise sequences

* Activities for promises

* Automated change: Fix sanity tests

* review feedback

* review feedback

* review feedback

* review feedback

* review feedback

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/27120/head
Craig Tiller 3 years ago committed by GitHub
parent 2d2576513d
commit 5cdaec9a4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      BUILD
  2. 106
      CMakeLists.txt
  3. 135
      build_autogenerated.yaml
  4. 113
      src/core/lib/promise/activity.cc
  5. 437
      src/core/lib/promise/activity.h
  6. 4
      src/core/lib/promise/detail/promise_factory.h
  7. 72
      src/core/lib/promise/wait_set.h
  8. 16
      test/core/promise/BUILD
  9. 258
      test/core/promise/activity_test.cc
  10. 24
      tools/run_tests/generated/tests.json

36
BUILD

@ -1077,6 +1077,42 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "activity",
srcs = [
"src/core/lib/promise/activity.cc",
],
language = "c++",
public_hdrs = [
"src/core/lib/promise/activity.h",
],
deps = [
"atomic_utils",
"construct_destruct",
"context",
"gpr_base",
"gpr_codegen",
"poll",
"promise_factory",
"promise_status",
],
)
grpc_cc_library(
name = "wait_set",
external_deps = [
"absl/container:flat_hash_set",
],
language = "c++",
public_hdrs = [
"src/core/lib/promise/wait_set.h",
],
deps = [
"activity",
"gpr_platform",
],
)
grpc_cc_library(
name = "ref_counted",
language = "c++",

106
CMakeLists.txt generated

@ -725,6 +725,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_c varint_test)
add_custom_target(buildtests_cxx)
add_dependencies(buildtests_cxx activity_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx address_sorting_test)
endif()
@ -7682,6 +7683,111 @@ target_link_libraries(varint_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(activity_test
src/core/ext/upb-generated/google/api/annotations.upb.c
src/core/ext/upb-generated/google/api/expr/v1alpha1/checked.upb.c
src/core/ext/upb-generated/google/api/expr/v1alpha1/syntax.upb.c
src/core/ext/upb-generated/google/api/http.upb.c
src/core/ext/upb-generated/google/protobuf/any.upb.c
src/core/ext/upb-generated/google/protobuf/duration.upb.c
src/core/ext/upb-generated/google/protobuf/empty.upb.c
src/core/ext/upb-generated/google/protobuf/struct.upb.c
src/core/ext/upb-generated/google/protobuf/timestamp.upb.c
src/core/ext/upb-generated/google/protobuf/wrappers.upb.c
src/core/ext/upb-generated/google/rpc/status.upb.c
src/core/lib/gpr/alloc.cc
src/core/lib/gpr/atm.cc
src/core/lib/gpr/cpu_iphone.cc
src/core/lib/gpr/cpu_linux.cc
src/core/lib/gpr/cpu_posix.cc
src/core/lib/gpr/cpu_windows.cc
src/core/lib/gpr/env_linux.cc
src/core/lib/gpr/env_posix.cc
src/core/lib/gpr/env_windows.cc
src/core/lib/gpr/log.cc
src/core/lib/gpr/log_android.cc
src/core/lib/gpr/log_linux.cc
src/core/lib/gpr/log_posix.cc
src/core/lib/gpr/log_windows.cc
src/core/lib/gpr/murmur_hash.cc
src/core/lib/gpr/string.cc
src/core/lib/gpr/string_posix.cc
src/core/lib/gpr/string_util_windows.cc
src/core/lib/gpr/string_windows.cc
src/core/lib/gpr/sync.cc
src/core/lib/gpr/sync_abseil.cc
src/core/lib/gpr/sync_posix.cc
src/core/lib/gpr/sync_windows.cc
src/core/lib/gpr/time.cc
src/core/lib/gpr/time_posix.cc
src/core/lib/gpr/time_precise.cc
src/core/lib/gpr/time_windows.cc
src/core/lib/gpr/tmpfile_msys.cc
src/core/lib/gpr/tmpfile_posix.cc
src/core/lib/gpr/tmpfile_windows.cc
src/core/lib/gpr/wrap_memcpy.cc
src/core/lib/gprpp/arena.cc
src/core/lib/gprpp/examine_stack.cc
src/core/lib/gprpp/fork.cc
src/core/lib/gprpp/global_config_env.cc
src/core/lib/gprpp/host_port.cc
src/core/lib/gprpp/mpscq.cc
src/core/lib/gprpp/stat_posix.cc
src/core/lib/gprpp/stat_windows.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/thd_posix.cc
src/core/lib/gprpp/thd_windows.cc
src/core/lib/gprpp/time_util.cc
src/core/lib/profiling/basic_timers.cc
src/core/lib/profiling/stap_timers.cc
src/core/lib/promise/activity.cc
test/core/promise/activity_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(activity_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(activity_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::base
absl::core_headers
absl::flat_hash_set
absl::memory
absl::status
absl::statusor
absl::cord
absl::str_format
absl::strings
absl::synchronization
absl::time
absl::optional
absl::variant
upb
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

@ -4194,6 +4194,141 @@ targets:
deps:
- grpc_test_util
uses_polling: false
- name: activity_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/upb-generated/google/api/annotations.upb.h
- src/core/ext/upb-generated/google/api/expr/v1alpha1/checked.upb.h
- src/core/ext/upb-generated/google/api/expr/v1alpha1/syntax.upb.h
- src/core/ext/upb-generated/google/api/http.upb.h
- src/core/ext/upb-generated/google/protobuf/any.upb.h
- src/core/ext/upb-generated/google/protobuf/duration.upb.h
- src/core/ext/upb-generated/google/protobuf/empty.upb.h
- src/core/ext/upb-generated/google/protobuf/struct.upb.h
- src/core/ext/upb-generated/google/protobuf/timestamp.upb.h
- src/core/ext/upb-generated/google/protobuf/wrappers.upb.h
- src/core/ext/upb-generated/google/rpc/status.upb.h
- src/core/lib/gpr/alloc.h
- src/core/lib/gpr/env.h
- src/core/lib/gpr/murmur_hash.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gpr/string.h
- src/core/lib/gpr/string_windows.h
- src/core/lib/gpr/time_precise.h
- src/core/lib/gpr/tls.h
- src/core/lib/gpr/tmpfile.h
- src/core/lib/gpr/useful.h
- src/core/lib/gprpp/arena.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/construct_destruct.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/examine_stack.h
- src/core/lib/gprpp/fork.h
- src/core/lib/gprpp/global_config.h
- src/core/lib/gprpp/global_config_custom.h
- src/core/lib/gprpp/global_config_env.h
- src/core/lib/gprpp/global_config_generic.h
- src/core/lib/gprpp/host_port.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/memory.h
- src/core/lib/gprpp/mpscq.h
- src/core/lib/gprpp/stat.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/sync.h
- src/core/lib/gprpp/thd.h
- src/core/lib/gprpp/time_util.h
- src/core/lib/profiling/timers.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/join.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/wait_set.h
src:
- src/core/ext/upb-generated/google/api/annotations.upb.c
- src/core/ext/upb-generated/google/api/expr/v1alpha1/checked.upb.c
- src/core/ext/upb-generated/google/api/expr/v1alpha1/syntax.upb.c
- src/core/ext/upb-generated/google/api/http.upb.c
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/protobuf/duration.upb.c
- src/core/ext/upb-generated/google/protobuf/empty.upb.c
- src/core/ext/upb-generated/google/protobuf/struct.upb.c
- src/core/ext/upb-generated/google/protobuf/timestamp.upb.c
- src/core/ext/upb-generated/google/protobuf/wrappers.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/gpr/alloc.cc
- src/core/lib/gpr/atm.cc
- src/core/lib/gpr/cpu_iphone.cc
- src/core/lib/gpr/cpu_linux.cc
- src/core/lib/gpr/cpu_posix.cc
- src/core/lib/gpr/cpu_windows.cc
- src/core/lib/gpr/env_linux.cc
- src/core/lib/gpr/env_posix.cc
- src/core/lib/gpr/env_windows.cc
- src/core/lib/gpr/log.cc
- src/core/lib/gpr/log_android.cc
- src/core/lib/gpr/log_linux.cc
- src/core/lib/gpr/log_posix.cc
- src/core/lib/gpr/log_windows.cc
- src/core/lib/gpr/murmur_hash.cc
- src/core/lib/gpr/string.cc
- src/core/lib/gpr/string_posix.cc
- src/core/lib/gpr/string_util_windows.cc
- src/core/lib/gpr/string_windows.cc
- src/core/lib/gpr/sync.cc
- src/core/lib/gpr/sync_abseil.cc
- src/core/lib/gpr/sync_posix.cc
- src/core/lib/gpr/sync_windows.cc
- src/core/lib/gpr/time.cc
- src/core/lib/gpr/time_posix.cc
- src/core/lib/gpr/time_precise.cc
- src/core/lib/gpr/time_windows.cc
- src/core/lib/gpr/tmpfile_msys.cc
- src/core/lib/gpr/tmpfile_posix.cc
- src/core/lib/gpr/tmpfile_windows.cc
- src/core/lib/gpr/wrap_memcpy.cc
- src/core/lib/gprpp/arena.cc
- src/core/lib/gprpp/examine_stack.cc
- src/core/lib/gprpp/fork.cc
- src/core/lib/gprpp/global_config_env.cc
- src/core/lib/gprpp/host_port.cc
- src/core/lib/gprpp/mpscq.cc
- src/core/lib/gprpp/stat_posix.cc
- src/core/lib/gprpp/stat_windows.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/thd_posix.cc
- src/core/lib/gprpp/thd_windows.cc
- src/core/lib/gprpp/time_util.cc
- src/core/lib/profiling/basic_timers.cc
- src/core/lib/profiling/stap_timers.cc
- src/core/lib/promise/activity.cc
- test/core/promise/activity_test.cc
deps:
- absl/base:base
- absl/base:core_headers
- absl/container:flat_hash_set
- absl/memory:memory
- absl/status:status
- absl/status:statusor
- absl/strings:cord
- absl/strings:str_format
- absl/strings:strings
- absl/synchronization:synchronization
- absl/time:time
- absl/types:optional
- absl/types:variant
- upb
uses_polling: false
- name: address_sorting_test
gtest: true
build: test

@ -0,0 +1,113 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/lib/gprpp/atomic_utils.h"
#include "src/core/lib/promise/activity.h"
namespace grpc_core {
///////////////////////////////////////////////////////////////////////////////
// GLOBALS
ABSL_CONST_INIT GPR_THREAD_LOCAL(Activity*) Activity::g_current_activity_ =
nullptr;
Waker::Unwakeable Waker::unwakeable_;
///////////////////////////////////////////////////////////////////////////////
// HELPER TYPES
// Weak handle to an Activity.
// Handle can persist while Activity goes away.
class Activity::Handle final : public Wakeable {
public:
explicit Handle(Activity* activity) : activity_(activity) {}
// Ref the Handle (not the activity).
void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
// Activity is going away... drop its reference and sever the connection back.
void DropActivity() ABSL_LOCKS_EXCLUDED(mu_) {
mu_.Lock();
GPR_ASSERT(activity_ != nullptr);
activity_ = nullptr;
mu_.Unlock();
Unref();
}
// Activity needs to wake up (if it still exists!) - wake it up, and drop the
// ref that was kept for this handle.
void Wakeup() override ABSL_LOCKS_EXCLUDED(mu_) {
mu_.Lock();
// Note that activity refcount can drop to zero, but we could win the lock
// against DropActivity, so we need to only increase activities refcount if
// it is non-zero.
if (activity_ && activity_->RefIfNonzero()) {
Activity* activity = activity_;
mu_.Unlock();
// Activity still exists and we have a reference: wake it up, which will
// drop the ref.
activity->Wakeup();
} else {
// Could not get the activity - it's either gone or going. No need to wake
// it up!
mu_.Unlock();
}
// Drop the ref to the handle (we have one ref = one wakeup semantics).
Unref();
}
void Drop() override { Unref(); }
private:
// Unref the Handle (not the activity).
void Unref() {
if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
delete this;
}
}
// Two initial refs: one for the waiter that caused instantiation, one for the
// activity.
std::atomic<size_t> refs_{2};
Mutex mu_ ABSL_ACQUIRED_AFTER(activity_->mu_);
Activity* activity_ ABSL_GUARDED_BY(mu_);
};
///////////////////////////////////////////////////////////////////////////////
// ACTIVITY IMPLEMENTATION
bool Activity::RefIfNonzero() { return IncrementIfNonzero(&refs_); }
Activity::Handle* Activity::RefHandle() {
if (handle_ == nullptr) {
// No handle created yet - construct it and return it.
handle_ = new Handle(this);
return handle_;
} else {
// Already had to create a handle, ref & return it.
handle_->Ref();
return handle_;
}
}
void Activity::DropHandle() {
handle_->DropActivity();
handle_ = nullptr;
}
Waker Activity::MakeNonOwningWaker() { return Waker(RefHandle()); }
} // namespace grpc_core

@ -0,0 +1,437 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_PROMISE_ACTIVITY_H
#define GRPC_CORE_LIB_PROMISE_ACTIVITY_H
#include <grpc/impl/codegen/port_platform.h>
#include <functional>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/promise_factory.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
// A Wakeable object is used by queues to wake activities.
class Wakeable {
public:
// Wake up the underlying activity.
// After calling, this Wakeable cannot be used again.
virtual void Wakeup() = 0;
// Drop this wakeable without waking up the underlying activity.
virtual void Drop() = 0;
protected:
inline ~Wakeable() {}
};
// An owning reference to a Wakeable.
// This type is non-copyable but movable.
class Waker {
public:
explicit Waker(Wakeable* wakeable) : wakeable_(wakeable) {}
Waker() : wakeable_(&unwakeable_) {}
~Waker() { wakeable_->Drop(); }
Waker(const Waker&) = delete;
Waker& operator=(const Waker&) = delete;
Waker(Waker&& other) noexcept : wakeable_(other.wakeable_) {
other.wakeable_ = &unwakeable_;
}
Waker& operator=(Waker&& other) noexcept {
std::swap(wakeable_, other.wakeable_);
return *this;
}
// Wake the underlying activity.
void Wakeup() {
wakeable_->Wakeup();
wakeable_ = &unwakeable_;
}
template <typename H>
friend H AbslHashValue(H h, const Waker& w) {
return H::combine(std::move(h), w.wakeable_);
}
bool operator==(const Waker& other) const noexcept {
return wakeable_ == other.wakeable_;
}
private:
class Unwakeable final : public Wakeable {
public:
void Wakeup() final { abort(); }
void Drop() final {}
};
Wakeable* wakeable_;
static Unwakeable unwakeable_;
};
// An Activity tracks execution of a single promise.
// It executes the promise under a mutex.
// When the promise stalls, it registers the containing activity to be woken up
// later.
// The activity takes a callback, which will be called exactly once with the
// result of execution.
// Activity execution may be cancelled by simply deleting the activity. In such
// a case, if execution had not already finished, the done callback would be
// called with absl::CancelledError().
// Activity also takes a CallbackScheduler instance on which to schedule
// callbacks to itself in a lock-clean environment.
class Activity : private Wakeable {
public:
// Cancel execution of the underlying promise.
virtual void Cancel() ABSL_LOCKS_EXCLUDED(mu_) = 0;
// Destroy the Activity - used for the type alias ActivityPtr.
struct Deleter {
void operator()(Activity* activity) {
activity->Cancel();
activity->Unref();
}
};
// Fetch the size of the implementation of this activity.
virtual size_t Size() = 0;
// Wakeup the current threads activity - will force a subsequent poll after
// the one that's running.
static void WakeupCurrent() { current()->got_wakeup_during_run_ = true; }
// Return the current activity.
// Additionally:
// - assert that there is a current activity (and catch bugs if there's not)
// - indicate to thread safety analysis that the current activity is indeed
// locked
// - back up that assertation with a runtime check in debug builds (it's
// prohibitively expensive in non-debug builds)
static Activity* current() ABSL_ASSERT_EXCLUSIVE_LOCK(current()->mu_) {
#ifndef NDEBUG
GPR_ASSERT(g_current_activity_);
if (g_current_activity_ != nullptr) {
g_current_activity_->mu_.AssertHeld();
}
#endif
return g_current_activity_;
}
// Produce an activity-owning Waker. The produced waker will keep the activity
// alive until it's awoken or dropped.
Waker MakeOwningWaker() {
Ref();
return Waker(this);
}
// Produce a non-owning Waker. The waker will own a small heap allocated weak
// pointer to this activity. This is more suitable for wakeups that may not be
// delivered until long after the activity should be destroyed.
Waker MakeNonOwningWaker() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
protected:
inline virtual ~Activity() {
if (handle_) {
DropHandle();
}
}
// All promise execution occurs under this mutex.
Mutex mu_;
// Check if this activity is the current activity executing on the current
// thread.
bool is_current() const { return this == g_current_activity_; }
// Check if there is an activity executing on the current thread.
static bool have_current() { return g_current_activity_ != nullptr; }
// Check if we got an internal wakeup since the last time this function was
// called.
bool got_wakeup() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return absl::exchange(got_wakeup_during_run_, false);
}
// Set the current activity at construction, clean it up at destruction.
class ScopedActivity {
public:
explicit ScopedActivity(Activity* activity) {
GPR_ASSERT(g_current_activity_ == nullptr);
g_current_activity_ = activity;
}
~ScopedActivity() { g_current_activity_ = nullptr; }
ScopedActivity(const ScopedActivity&) = delete;
ScopedActivity& operator=(const ScopedActivity&) = delete;
};
// Implementors of Wakeable::Wakeup should call this after the wakeup has
// completed.
void WakeupComplete() { Unref(); }
private:
class Handle;
void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
void Unref() {
if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
delete this;
}
}
// Return a Handle instance with a ref so that it can be stored waiting for
// some wakeup.
Handle* RefHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// If our refcount is non-zero, ref and return true.
// Otherwise, return false.
bool RefIfNonzero();
// Drop the (proved existing) wait handle.
void DropHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Current refcount.
std::atomic<uint32_t> refs_{1};
// If wakeup is called during Promise polling, we raise this flag and repoll
// until things settle out.
bool got_wakeup_during_run_ ABSL_GUARDED_BY(mu_) = false;
// Handle for long waits. Allows a very small weak pointer type object to
// queue for wakeups while Activity may be deleted earlier.
Handle* handle_ ABSL_GUARDED_BY(mu_) = nullptr;
// Set during RunLoop to the Activity that's executing.
// Being set implies that mu_ is held.
static GPR_THREAD_LOCAL(Activity*) g_current_activity_;
};
// Owned pointer to one Activity.
using ActivityPtr = std::unique_ptr<Activity, Activity::Deleter>;
namespace promise_detail {
template <typename Context>
class ContextHolder {
public:
explicit ContextHolder(Context value) : value_(std::move(value)) {}
Context* GetContext() { return &value_; }
private:
Context value_;
};
template <typename Context>
class ContextHolder<Context*> {
public:
explicit ContextHolder(Context* value) : value_(value) {}
Context* GetContext() { return value_; }
private:
Context* value_;
};
template <typename... Contexts>
class EnterContexts : public promise_detail::Context<Contexts>... {
public:
explicit EnterContexts(Contexts*... contexts)
: promise_detail::Context<Contexts>(contexts)... {}
};
// Implementation details for an Activity of an arbitrary type of promise.
template <class F, class CallbackScheduler, class OnDone, typename... Contexts>
class PromiseActivity final
: public Activity,
private promise_detail::ContextHolder<Contexts>... {
public:
using Factory = PromiseFactory<void, F>;
PromiseActivity(F promise_factory, CallbackScheduler callback_scheduler,
OnDone on_done, Contexts... contexts)
: Activity(),
ContextHolder<Contexts>(std::move(contexts))...,
callback_scheduler_(std::move(callback_scheduler)),
on_done_(std::move(on_done)) {
// Lock, construct an initial promise from the factory, and step it.
// This may hit a waiter, which could expose our this pointer to other
// threads, meaning we do need to hold this mutex even though we're still
// constructing.
mu_.Lock();
auto status = Start(Factory(std::move(promise_factory)));
mu_.Unlock();
// We may complete immediately.
if (status.has_value()) {
on_done_(std::move(*status));
}
}
~PromiseActivity() override {
// We shouldn't destruct without calling Cancel() first, and that must get
// us to be done_, so we assume that and have no logic to destruct the
// promise here.
GPR_ASSERT(done_);
}
size_t Size() override { return sizeof(*this); }
void Cancel() final {
bool was_done;
{
MutexLock lock(&mu_);
// Check if we were done, and flag done.
was_done = done_;
if (!done_) MarkDone();
}
// If we were not done, then call the on_done callback.
if (!was_done) {
on_done_(absl::CancelledError());
}
}
private:
// Wakeup this activity. Arrange to poll the activity again at a convenient
// time: this could be inline if it's deemed safe, or it could be by passing
// the activity to an external threadpool to run. If the activity is already
// running on this thread, a note is taken of such and the activity is
// repolled if it doesn't complete.
void Wakeup() final {
// If there's no active activity, we can just run inline.
if (!Activity::have_current()) {
Step();
WakeupComplete();
return;
}
// If there is an active activity, but hey it's us, flag that and we'll loop
// in RunLoop (that's calling from above here!).
if (Activity::is_current()) {
WakeupCurrent();
WakeupComplete();
return;
}
// Can't safely run, so ask to run later.
callback_scheduler_([this]() {
this->Step();
this->WakeupComplete();
});
}
// Drop a wakeup
void Drop() final { this->WakeupComplete(); }
// Notification that we're no longer executing - it's ok to destruct the
// promise.
void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
GPR_ASSERT(!done_);
done_ = true;
Destruct(&promise_holder_.promise);
}
// In response to Wakeup, run the Promise state machine again until it
// settles. Then check for completion, and if we have completed, call on_done.
void Step() ABSL_LOCKS_EXCLUDED(mu_) {
// Poll the promise until things settle out under a lock.
mu_.Lock();
if (done_) {
// We might get some spurious wakeups after finishing.
mu_.Unlock();
return;
}
auto status = RunStep();
mu_.Unlock();
if (status.has_value()) {
on_done_(std::move(*status));
}
}
// The main body of a step: set the current activity, and any contexts, and
// then run the main polling loop. Contained in a function by itself in order
// to keep the scoping rules a little easier in Step().
absl::optional<absl::Status> RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
ScopedActivity scoped_activity(this);
EnterContexts<Contexts...> contexts(
static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
return StepLoop();
}
// Similarly to RunStep, but additionally construct the promise from a promise
// factory before entering the main loop. Called once from the constructor.
absl::optional<absl::Status> Start(Factory promise_factory)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
ScopedActivity scoped_activity(this);
EnterContexts<Contexts...> contexts(
static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
Construct(&promise_holder_.promise, promise_factory.Once());
return StepLoop();
}
// Until there are no wakeups from within and the promise is incomplete: poll
// the promise.
absl::optional<absl::Status> StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
GPR_ASSERT(is_current());
do {
// Run the promise.
GPR_ASSERT(!done_);
auto r = promise_holder_.promise();
if (auto* status = absl::get_if<kPollReadyIdx>(&r)) {
// If complete, destroy the promise, flag done, and exit this loop.
MarkDone();
return IntoStatus(status);
}
// Continue looping til no wakeups occur.
} while (got_wakeup());
return {};
}
using Promise = typename Factory::Promise;
// We wrap the promise in a union to allow control over the construction
// simultaneously with annotating mutex requirements and noting that the
// promise contained may not use any memory.
union PromiseHolder {
PromiseHolder() {}
~PromiseHolder() {}
GPR_NO_UNIQUE_ADDRESS Promise promise;
};
GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ ABSL_GUARDED_BY(mu_);
// Schedule callbacks on some external executor.
GPR_NO_UNIQUE_ADDRESS CallbackScheduler callback_scheduler_;
// Callback on completion of the promise.
GPR_NO_UNIQUE_ADDRESS OnDone on_done_;
// Has execution completed?
GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu_) = false;
};
} // namespace promise_detail
// Given a functor that returns a promise (a promise factory), a callback for
// completion, and a callback scheduler, construct an activity.
template <typename Factory, typename CallbackScheduler, typename OnDone,
typename... Contexts>
ActivityPtr MakeActivity(Factory promise_factory,
CallbackScheduler callback_scheduler, OnDone on_done,
Contexts... contexts) {
return ActivityPtr(
new promise_detail::PromiseActivity<Factory, CallbackScheduler, OnDone,
Contexts...>(
std::move(promise_factory), std::move(callback_scheduler),
std::move(on_done), std::move(contexts)...));
}
// A callback scheduler that simply crashes
struct NoCallbackScheduler {
template <typename F>
void operator()(F) {
abort();
}
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_PROMISE_ACTIVITY_H

@ -98,9 +98,9 @@ class Curried {
// capturing A.
template <typename A, typename F>
absl::enable_if_t<!IsVoidCallable<ResultOf<F(A)>>::value,
PromiseLike<Curried<A, F>>>
PromiseLike<Curried<F, A>>>
PromiseFactoryImpl(F&& f, A&& arg) {
return Curried<A, F>(std::forward<F>(f), std::forward<A>(arg));
return Curried<F, A>(std::forward<F>(f), std::forward<A>(arg));
}
// Promote a callable() -> T|Poll<T> to a PromiseFactory(A) -> Promise<T>

@ -0,0 +1,72 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_PROMISE_WAIT_SET_H
#define GRPC_CORE_LIB_PROMISE_WAIT_SET_H
#include <grpc/impl/codegen/port_platform.h>
#include "absl/container/flat_hash_set.h"
#include "src/core/lib/promise/activity.h"
namespace grpc_core {
// Helper type that can be used to enqueue many Activities waiting for some
// external state.
// Typically the external state should be guarded by mu_, and a call to
// WakeAllAndUnlock should be made when the state changes.
// Promises should bottom out polling inside pending(), which will register for
// wakeup and return Pending().
// Queues handles to Activities, and not Activities themselves, meaning that if
// an Activity is destroyed prior to wakeup we end up holding only a small
// amount of memory (around 16 bytes + malloc overhead) until the next wakeup
// occurs.
class WaitSet final {
using WakerSet = absl::flat_hash_set<Waker>;
public:
// Register for wakeup, return Pending(). If state is not ready to proceed,
// Promises should bottom out here.
Pending AddPending(Waker waker) {
pending_.emplace(std::move(waker));
return Pending();
}
class WakeupSet {
public:
void Wakeup() {
while (!wakeup_.empty()) {
wakeup_.extract(wakeup_.begin()).value().Wakeup();
}
}
private:
friend class WaitSet;
explicit WakeupSet(WakerSet&& wakeup)
: wakeup_(std::forward<WakerSet>(wakeup)) {}
WakerSet wakeup_;
};
GRPC_MUST_USE_RESULT WakeupSet TakeWakeupSet() {
return WakeupSet(std::move(pending_));
}
private:
// Handles to activities that need to be awoken.
WakerSet pending_;
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_PROMISE_WAIT_SET_H

@ -172,3 +172,19 @@ grpc_cc_test(
"//test/core/util:grpc_suppressions",
],
)
grpc_cc_test(
name = "activity_test",
srcs = ["activity_test.cc"],
external_deps = ["gtest"],
language = "c++",
uses_polling = False,
deps = [
"//:activity",
"//:join",
"//:promise",
"//:seq",
"//:wait_set",
"//test/core/util:grpc_suppressions",
],
)

@ -0,0 +1,258 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/promise/activity.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/wait_set.h"
using testing::_;
using testing::Mock;
using testing::MockFunction;
using testing::SaveArg;
using testing::StrictMock;
namespace grpc_core {
class MockCallbackScheduler {
public:
MOCK_METHOD(void, Schedule, (std::function<void()>));
};
// A simple Barrier type: stalls progress until it is 'cleared'.
class Barrier {
public:
struct Result {};
Promise<Result> Wait() {
return [this]() -> Poll<Result> {
absl::MutexLock lock(&mu_);
if (cleared_) {
return Result{};
} else {
return wait_set_.AddPending(Activity::current()->MakeOwningWaker());
}
};
}
void Clear() {
mu_.Lock();
cleared_ = true;
auto wakeup = wait_set_.TakeWakeupSet();
mu_.Unlock();
wakeup.Wakeup();
}
private:
absl::Mutex mu_;
WaitSet wait_set_ ABSL_GUARDED_BY(mu_);
bool cleared_ ABSL_GUARDED_BY(mu_) = false;
};
// A simple Barrier type: stalls progress until it is 'cleared'.
// This variant supports only a single waiter.
class SingleBarrier {
public:
struct Result {};
Promise<Result> Wait() {
return [this]() -> Poll<Result> {
absl::MutexLock lock(&mu_);
if (cleared_) {
return Result{};
} else {
waker_ = Activity::current()->MakeOwningWaker();
return Pending();
}
};
}
void Clear() {
mu_.Lock();
cleared_ = true;
auto waker = std::move(waker_);
mu_.Unlock();
waker.Wakeup();
}
private:
absl::Mutex mu_;
Waker waker_ ABSL_GUARDED_BY(mu_);
bool cleared_ ABSL_GUARDED_BY(mu_) = false;
};
TEST(ActivityTest, ImmediatelyCompleteWithSuccess) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[] { return [] { return absl::OkStatus(); }; }, NoCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST(ActivityTest, ImmediatelyCompleteWithFailure) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::CancelledError()));
MakeActivity(
[] { return [] { return absl::CancelledError(); }; },
NoCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST(ActivityTest, DropImmediately) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::CancelledError()));
MakeActivity(
[] { return []() -> Poll<absl::Status> { return Pending(); }; },
NoCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST(ActivityTest, Cancel) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
auto activity = MakeActivity(
[] { return []() -> Poll<absl::Status> { return Pending(); }; },
NoCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
EXPECT_CALL(on_done, Call(absl::CancelledError()));
activity->Cancel();
Mock::VerifyAndClearExpectations(&on_done);
activity.reset();
}
template <typename B>
class BarrierTest : public testing::Test {
public:
using Type = B;
};
using BarrierTestTypes = testing::Types<Barrier, SingleBarrier>;
TYPED_TEST_SUITE(BarrierTest, BarrierTestTypes);
TYPED_TEST(BarrierTest, Barrier) {
typename TestFixture::Type b;
StrictMock<MockFunction<void(absl::Status)>> on_done;
auto activity = MakeActivity(
[&b] {
return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
return absl::OkStatus();
});
},
NoCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Clearing the barrier should let the activity proceed to return a result.
EXPECT_CALL(on_done, Call(absl::OkStatus()));
b.Clear();
}
TYPED_TEST(BarrierTest, BarrierPing) {
typename TestFixture::Type b1;
typename TestFixture::Type b2;
StrictMock<MockFunction<void(absl::Status)>> on_done1;
StrictMock<MockFunction<void(absl::Status)>> on_done2;
MockCallbackScheduler scheduler;
auto activity1 = MakeActivity(
[&b1, &b2] {
return Seq(b1.Wait(), [&b2](typename TestFixture::Type::Result) {
// Clear the barrier whilst executing an activity
b2.Clear();
return absl::OkStatus();
});
},
[&scheduler](std::function<void()> f) { scheduler.Schedule(f); },
[&on_done1](absl::Status status) { on_done1.Call(std::move(status)); });
auto activity2 = MakeActivity(
[&b2] {
return Seq(b2.Wait(), [](typename TestFixture::Type::Result) {
return absl::OkStatus();
});
},
[&scheduler](std::function<void()> f) { scheduler.Schedule(f); },
[&on_done2](absl::Status status) { on_done2.Call(std::move(status)); });
EXPECT_CALL(on_done1, Call(absl::OkStatus()));
// Since barrier triggers inside activity1 promise, activity2 wakeup will be
// scheduled from a callback.
std::function<void()> cb;
EXPECT_CALL(scheduler, Schedule(_)).WillOnce(SaveArg<0>(&cb));
b1.Clear();
Mock::VerifyAndClearExpectations(&on_done1);
EXPECT_CALL(on_done2, Call(absl::OkStatus()));
cb();
}
TYPED_TEST(BarrierTest, WakeSelf) {
typename TestFixture::Type b;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&b] {
return Seq(Join(b.Wait(),
[&b] {
b.Clear();
return 1;
}),
[](std::tuple<typename TestFixture::Type::Result, int>) {
return absl::OkStatus();
});
},
NoCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TYPED_TEST(BarrierTest, WakeAfterDestruction) {
typename TestFixture::Type b;
{
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::CancelledError()));
MakeActivity(
[&b] {
return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
return absl::OkStatus();
});
},
NoCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
b.Clear();
}
struct TestContext {
bool* done;
};
template <>
struct ContextType<TestContext> {};
TEST(ActivityTest, WithContext) {
bool done = false;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[] {
*GetContext<TestContext>()->done = true;
return Immediate(absl::OkStatus());
},
NoCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
TestContext{&done});
EXPECT_TRUE(done);
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -3081,6 +3081,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "activity_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save