From 8bed0f30fb3862c3e933acb8911f0afcc2f92a0f Mon Sep 17 00:00:00 2001 From: gRPC Team Bot Date: Mon, 13 Nov 2023 21:16:34 +0000 Subject: [PATCH 1/7] Internal change PiperOrigin-RevId: 582062765 --- BUILD | 2 +- bazel/googleapis.BUILD | 2 +- include/grpc/impl/channel_arg_names.h | 2 +- src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 2 +- src/objective-c/RxLibrary/GRXMappingWriter.h | 2 +- src/objective-c/tests/Common/GRPCBlockCallbackResponseHandler.m | 2 +- src/proto/grpc/testing/BUILD | 2 +- src/proto/grpc/testing/xds/v3/BUILD | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/BUILD b/BUILD index 3fd5a6c6388..a83ccf8d7d4 100644 --- a/BUILD +++ b/BUILD @@ -30,8 +30,8 @@ licenses(["reciprocal"]) package( default_visibility = ["//visibility:public"], features = [ - "layering_check", "-parse_headers", + "layering_check", ], ) diff --git a/bazel/googleapis.BUILD b/bazel/googleapis.BUILD index dc7e05f7424..ee9473b1526 100644 --- a/bazel/googleapis.BUILD +++ b/bazel/googleapis.BUILD @@ -15,7 +15,7 @@ licenses(["notice"]) package( - default_visibility = ["//visibility:public"] + default_visibility = ["//visibility:public"], ) # This is needed for the dependency on google_cloud_cpp to work. diff --git a/include/grpc/impl/channel_arg_names.h b/include/grpc/impl/channel_arg_names.h index def07be248d..b4cece27c9e 100644 --- a/include/grpc/impl/channel_arg_names.h +++ b/include/grpc/impl/channel_arg_names.h @@ -15,7 +15,7 @@ #ifndef GRPC_IMPL_CHANNEL_ARG_NAMES_H #define GRPC_IMPL_CHANNEL_ARG_NAMES_H -// IWYU pragma: private, include "third_party/grpc/include/grpc/grpc.h" +// IWYU pragma: private, include // IWYU pragma: friend "src/.*" // IWYU pragma: friend "test/.*" diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 115195463de..9686c7567a7 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -18,7 +18,7 @@ #import "GRXConcurrentWriteable.h" -#import +#import "GRXWriteable.h" @interface GRXConcurrentWriteable () // This is atomic so that cancellation can nillify it from any thread. diff --git a/src/objective-c/RxLibrary/GRXMappingWriter.h b/src/objective-c/RxLibrary/GRXMappingWriter.h index a344f966228..90a6e29ef21 100644 --- a/src/objective-c/RxLibrary/GRXMappingWriter.h +++ b/src/objective-c/RxLibrary/GRXMappingWriter.h @@ -16,7 +16,7 @@ * */ -#import "RxLibrary/GRXForwardingWriter.h" +#import "GRXForwardingWriter.h" /** A "proxy" writer that transforms all the values of its input writer by using a mapping function. */ diff --git a/src/objective-c/tests/Common/GRPCBlockCallbackResponseHandler.m b/src/objective-c/tests/Common/GRPCBlockCallbackResponseHandler.m index c797b0e91bc..39ceabad907 100644 --- a/src/objective-c/tests/Common/GRPCBlockCallbackResponseHandler.m +++ b/src/objective-c/tests/Common/GRPCBlockCallbackResponseHandler.m @@ -14,7 +14,7 @@ * limitations under the License. */ -#import "GRPCBlockCallbackResponseHandler.h" +#import @implementation GRPCBlockCallbackResponseHandler { void (^_initialMetadataCallback)(NSDictionary *); diff --git a/src/proto/grpc/testing/BUILD b/src/proto/grpc/testing/BUILD index 0f7bbbecf63..41ab62e76e2 100644 --- a/src/proto/grpc/testing/BUILD +++ b/src/proto/grpc/testing/BUILD @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library") load("@rules_proto//proto:defs.bzl", "proto_library") load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library") -load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library") licenses(["notice"]) diff --git a/src/proto/grpc/testing/xds/v3/BUILD b/src/proto/grpc/testing/xds/v3/BUILD index 9c02b2a9f6c..f72941136cc 100644 --- a/src/proto/grpc/testing/xds/v3/BUILD +++ b/src/proto/grpc/testing/xds/v3/BUILD @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library") load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library") +load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library") licenses(["notice"]) From 14d6b49fed7843adce9a17f133b1df233c598dc9 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Nov 2023 15:56:41 -0800 Subject: [PATCH 2/7] [pr] Disable title check for now (#34932) --- ...-tag-and-check-title.yaml => pr-auto-tag.yaml} | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) rename .github/workflows/{pr-auto-tag-and-check-title.yaml => pr-auto-tag.yaml} (57%) diff --git a/.github/workflows/pr-auto-tag-and-check-title.yaml b/.github/workflows/pr-auto-tag.yaml similarity index 57% rename from .github/workflows/pr-auto-tag-and-check-title.yaml rename to .github/workflows/pr-auto-tag.yaml index 2bd85cf3034..c0e440b7c44 100644 --- a/.github/workflows/pr-auto-tag-and-check-title.yaml +++ b/.github/workflows/pr-auto-tag.yaml @@ -1,4 +1,4 @@ -name: PR Title Check & Tag +name: PR Auto Tag on: pull_request_target: types: [opened, reopened, synchronize, edited] @@ -17,16 +17,3 @@ jobs: with: repo-token: "${{ secrets.GITHUB_TOKEN }}" sync-labels: "" - - title-check: - permissions: - contents: read - pull-requests: write - - runs-on: ubuntu-latest - steps: - - uses: thehanimo/pr-title-checker@v1.3.5 - with: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - pass_on_octokit_error: false - configuration_path: ".github/pr_title_checker_config.json" From c09a008414bd5cc01ff940bae5b3a0966a7fac19 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Nov 2023 15:55:33 -0800 Subject: [PATCH 3/7] test commit PiperOrigin-RevId: 582111135 --- doc/g_stands_for.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/g_stands_for.md b/doc/g_stands_for.md index 6c4ab101081..33267f9a111 100644 --- a/doc/g_stands_for.md +++ b/doc/g_stands_for.md @@ -61,3 +61,4 @@ - 1.59 'g' stands for ['generative'](https://github.com/grpc/grpc/tree/v1.59.x) - 1.60 'g' stands for ['gjallarhorn'](https://github.com/grpc/grpc/tree/v1.60.x) - 1.61 'g' stands for ['grand'](https://github.com/grpc/grpc/tree/master) +- 1.62 'g' stats for 'guitar' From 1ccb6a868b8b759fe0f77ed5eb55f04a1d1163af Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 14 Nov 2023 11:19:38 -0800 Subject: [PATCH 4/7] Remove badly formatted line PiperOrigin-RevId: 582384359 --- doc/g_stands_for.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/g_stands_for.md b/doc/g_stands_for.md index 33267f9a111..830257c29cd 100644 --- a/doc/g_stands_for.md +++ b/doc/g_stands_for.md @@ -61,4 +61,4 @@ - 1.59 'g' stands for ['generative'](https://github.com/grpc/grpc/tree/v1.59.x) - 1.60 'g' stands for ['gjallarhorn'](https://github.com/grpc/grpc/tree/v1.60.x) - 1.61 'g' stands for ['grand'](https://github.com/grpc/grpc/tree/master) -- 1.62 'g' stats for 'guitar' +- 1.62 'g' stands for 'guitar' From e884d6c190773dafda7a0760be2c8020c4e76873 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 14 Nov 2023 12:32:15 -0800 Subject: [PATCH 5/7] Remove test code PiperOrigin-RevId: 582406922 --- doc/g_stands_for.md | 1 - 1 file changed, 1 deletion(-) diff --git a/doc/g_stands_for.md b/doc/g_stands_for.md index 830257c29cd..6c4ab101081 100644 --- a/doc/g_stands_for.md +++ b/doc/g_stands_for.md @@ -61,4 +61,3 @@ - 1.59 'g' stands for ['generative'](https://github.com/grpc/grpc/tree/v1.59.x) - 1.60 'g' stands for ['gjallarhorn'](https://github.com/grpc/grpc/tree/v1.60.x) - 1.61 'g' stands for ['grand'](https://github.com/grpc/grpc/tree/master) -- 1.62 'g' stands for 'guitar' From f11c010e5183125c0cd09e4c12f31222d9bb0fb2 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 14 Nov 2023 14:58:29 -0800 Subject: [PATCH 6/7] [promises] Add awaitable spawn to Party (#34744) Add a variant of `Spawn` that returns a promise that can be awaited by another activity. This allows us to simply implement complex cross-activity synchronization. (necessary building block for #34740) Also adds an inter-activity latch as a building block to test this work. Closes #34744 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/34744 from ctiller:ninteen-ninety-nine 19074b255f335a9b02fc43d469190fa8868af3b4 PiperOrigin-RevId: 582450643 --- CMakeLists.txt | 34 ++++++ build_autogenerated.yaml | 18 ++- src/core/BUILD | 20 ++++ src/core/lib/promise/activity.cc | 19 +++- src/core/lib/promise/activity.h | 9 +- src/core/lib/promise/inter_activity_latch.h | 98 +++++++++++++++++ src/core/lib/promise/latch.h | 7 +- src/core/lib/promise/party.cc | 2 +- src/core/lib/promise/party.h | 103 +++++++++++++++++- src/core/lib/promise/wait_set.h | 6 + test/core/promise/BUILD | 22 ++++ .../core/promise/inter_activity_latch_test.cc | 103 ++++++++++++++++++ test/core/promise/party_test.cc | 26 +++++ tools/run_tests/generated/tests.json | 24 ++++ 14 files changed, 478 insertions(+), 13 deletions(-) create mode 100644 src/core/lib/promise/inter_activity_latch.h create mode 100644 test/core/promise/inter_activity_latch_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index bd6a8691f56..4c8979a56ec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1109,6 +1109,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx init_test) add_dependencies(buildtests_cxx initial_settings_frame_bad_client_test) add_dependencies(buildtests_cxx insecure_security_connector_test) + add_dependencies(buildtests_cxx inter_activity_latch_test) add_dependencies(buildtests_cxx inter_activity_pipe_test) add_dependencies(buildtests_cxx interceptor_list_test) add_dependencies(buildtests_cxx interop_client) @@ -14936,6 +14937,39 @@ target_link_libraries(insecure_security_connector_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(inter_activity_latch_test + test/core/promise/inter_activity_latch_test.cc +) +target_compile_features(inter_activity_latch_test PUBLIC cxx_std_14) +target_include_directories(inter_activity_latch_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(inter_activity_latch_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + grpc +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index f437de380a7..bd2a9aa0bbd 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -10835,6 +10835,20 @@ targets: deps: - gtest - grpc_test_util +- name: inter_activity_latch_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/promise/event_engine_wakeup_scheduler.h + - src/core/lib/promise/inter_activity_latch.h + - src/core/lib/promise/wait_set.h + src: + - test/core/promise/inter_activity_latch_test.cc + deps: + - gtest + - grpc + uses_polling: false - name: inter_activity_pipe_test gtest: true build: test @@ -12434,7 +12448,9 @@ targets: gtest: true build: test language: c++ - headers: [] + headers: + - src/core/lib/promise/inter_activity_latch.h + - src/core/lib/promise/wait_set.h src: - test/core/promise/party_test.cc deps: diff --git a/src/core/BUILD b/src/core/BUILD index 3df1d40375b..cbf8ed32ea0 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -478,6 +478,7 @@ grpc_cc_library( "arena", "construct_destruct", "context", + "poll", "promise_factory", "promise_trace", "ref_counted", @@ -874,6 +875,25 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "inter_activity_latch", + external_deps = [ + "absl/base:core_headers", + "absl/strings", + ], + language = "c++", + public_hdrs = [ + "lib/promise/inter_activity_latch.h", + ], + deps = [ + "activity", + "poll", + "promise_trace", + "wait_set", + "//:gpr", + ], +) + grpc_cc_library( name = "interceptor_list", hdrs = [ diff --git a/src/core/lib/promise/activity.cc b/src/core/lib/promise/activity.cc index 7beaeb1c704..60024b536cb 100644 --- a/src/core/lib/promise/activity.cc +++ b/src/core/lib/promise/activity.cc @@ -25,7 +25,6 @@ #include "absl/strings/str_join.h" #include "src/core/lib/gprpp/atomic_utils.h" -#include "src/core/lib/gprpp/crash.h" namespace grpc_core { @@ -84,7 +83,23 @@ class FreestandingActivity::Handle final : public Wakeable { } void WakeupAsync(WakeupMask) override ABSL_LOCKS_EXCLUDED(mu_) { - Crash("not implemented"); + mu_.Lock(); + // Note that activity refcount can drop to zero, but we could win the lock + // against DropActivity, so we need to only increase activities refcount if + // it is non-zero. + if (activity_ && activity_->RefIfNonzero()) { + FreestandingActivity* activity = activity_; + mu_.Unlock(); + // Activity still exists and we have a reference: wake it up, which will + // drop the ref. + activity->WakeupAsync(0); + } else { + // Could not get the activity - it's either gone or going. No need to wake + // it up! + mu_.Unlock(); + } + // Drop the ref to the handle (we have one ref = one wakeup semantics). + Unref(); } void Drop(WakeupMask) override { Unref(); } diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h index 4325207d93d..a05ddab8afb 100644 --- a/src/core/lib/promise/activity.h +++ b/src/core/lib/promise/activity.h @@ -32,7 +32,6 @@ #include #include "src/core/lib/gprpp/construct_destruct.h" -#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/no_destruct.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/sync.h" @@ -502,7 +501,7 @@ class PromiseActivity final // the activity to an external threadpool to run. If the activity is already // running on this thread, a note is taken of such and the activity is // repolled if it doesn't complete. - void Wakeup(WakeupMask) final { + void Wakeup(WakeupMask m) final { // If there is an active activity, but hey it's us, flag that and we'll loop // in RunLoop (that's calling from above here!). if (Activity::is_current()) { @@ -511,6 +510,10 @@ class PromiseActivity final WakeupComplete(); return; } + WakeupAsync(m); + } + + void WakeupAsync(WakeupMask) final { if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) { // Can't safely run, so ask to run later. this->ScheduleWakeup(); @@ -520,8 +523,6 @@ class PromiseActivity final } } - void WakeupAsync(WakeupMask) final { Crash("not implemented"); } - // Drop a wakeup void Drop(WakeupMask) final { this->WakeupComplete(); } diff --git a/src/core/lib/promise/inter_activity_latch.h b/src/core/lib/promise/inter_activity_latch.h new file mode 100644 index 00000000000..8c31c8a1625 --- /dev/null +++ b/src/core/lib/promise/inter_activity_latch.h @@ -0,0 +1,98 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H +#define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H + +#include + +#include + +#include + +#include "absl/base/thread_annotations.h" +#include "absl/strings/str_cat.h" + +#include + +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/poll.h" +#include "src/core/lib/promise/trace.h" +#include "src/core/lib/promise/wait_set.h" + +namespace grpc_core { + +// A latch providing true cross activity wakeups +template +class InterActivityLatch; + +template <> +class InterActivityLatch { + public: + InterActivityLatch() = default; + InterActivityLatch(const InterActivityLatch&) = delete; + InterActivityLatch& operator=(const InterActivityLatch&) = delete; + + // Produce a promise to wait for this latch. + auto Wait() { + return [this]() -> Poll { + MutexLock lock(&mu_); + if (grpc_trace_promise_primitives.enabled()) { + gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(), + StateString().c_str()); + } + if (is_set_) { + return Empty{}; + } else { + return waiters_.AddPending(Activity::current()->MakeNonOwningWaker()); + } + }; + } + + // Set the latch. + void Set() { + MutexLock lock(&mu_); + if (grpc_trace_promise_primitives.enabled()) { + gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str()); + } + is_set_ = true; + waiters_.WakeupAsync(); + } + + bool IsSet() const ABSL_LOCKS_EXCLUDED(mu_) { + MutexLock lock(&mu_); + return is_set_; + } + + private: + std::string DebugTag() { + return absl::StrCat(Activity::current()->DebugTag(), + " INTER_ACTIVITY_LATCH[0x", + reinterpret_cast(this), "]: "); + } + + std::string StateString() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + return absl::StrCat("is_set:", is_set_); + } + + mutable Mutex mu_; + // True if we have a value set, false otherwise. + bool is_set_ = false; + WaitSet waiters_ ABSL_GUARDED_BY(mu_); +}; + +} // namespace grpc_core + +#endif // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H diff --git a/src/core/lib/promise/latch.h b/src/core/lib/promise/latch.h index a8f911809f5..4ade84b8939 100644 --- a/src/core/lib/promise/latch.h +++ b/src/core/lib/promise/latch.h @@ -37,6 +37,7 @@ namespace grpc_core { // Initially the Latch is unset. // It can be waited upon by the Wait method, which produces a Promise that // resolves when the Latch is Set to a value of type T. +// Latches only work correctly within a single activity. template class Latch { public: @@ -204,6 +205,9 @@ class Latch { IntraActivityWaiter waiter_; }; +template +using LatchWaitPromise = decltype(std::declval>().Wait()); + // A Latch that can have its value observed by outside threads, but only waited // upon from inside a single activity. template @@ -268,9 +272,6 @@ class ExternallyObservableLatch { IntraActivityWaiter waiter_; }; -template -using LatchWaitPromise = decltype(std::declval>().Wait()); - } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_PROMISE_LATCH_H diff --git a/src/core/lib/promise/party.cc b/src/core/lib/promise/party.cc index 040afbfe9a3..275d06445a4 100644 --- a/src/core/lib/promise/party.cc +++ b/src/core/lib/promise/party.cc @@ -247,7 +247,7 @@ bool Party::RunParty() { } // Poll the participant. currently_polling_ = i; - bool done = participant->Poll(); + bool done = participant->PollParticipantPromise(); currently_polling_ = kNotPolling; if (done) { if (!name.empty()) { diff --git a/src/core/lib/promise/party.h b/src/core/lib/promise/party.h index 493a00925d1..1ff098f3aa3 100644 --- a/src/core/lib/promise/party.h +++ b/src/core/lib/promise/party.h @@ -24,6 +24,7 @@ #include #include +#include "absl/base/attributes.h" #include "absl/base/thread_annotations.h" #include "absl/strings/string_view.h" @@ -38,6 +39,7 @@ #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/detail/promise_factory.h" +#include "src/core/lib/promise/poll.h" #include "src/core/lib/promise/trace.h" #include "src/core/lib/resource_quota/arena.h" @@ -298,7 +300,7 @@ class Party : public Activity, private Wakeable { explicit Participant(absl::string_view name) : name_(name) {} // Poll the participant. Return true if complete. // Participant should take care of its own deallocation in this case. - virtual bool Poll() = 0; + virtual bool PollParticipantPromise() = 0; // Destroy the participant before finishing. virtual void Destroy() = 0; @@ -330,6 +332,9 @@ class Party : public Activity, private Wakeable { void Spawn(absl::string_view name, Factory promise_factory, OnComplete on_complete); + template + auto SpawnWaitable(absl::string_view name, Factory factory); + void Orphan() final { Crash("unused"); } // Activity implementation: not allowed to be overridden by derived types. @@ -414,7 +419,7 @@ class Party : public Activity, private Wakeable { } } - bool Poll() override { + bool PollParticipantPromise() override { if (!started_) { auto p = factory_.Make(); Destruct(&factory_); @@ -441,6 +446,89 @@ class Party : public Activity, private Wakeable { bool started_ = false; }; + template + class PromiseParticipantImpl final + : public RefCounted, + NonPolymorphicRefCount>, + public Participant { + using Factory = promise_detail::OncePromiseFactory; + using Promise = typename Factory::Promise; + using Result = typename Promise::Result; + + public: + PromiseParticipantImpl(absl::string_view name, + SuppliedFactory promise_factory) + : Participant(name) { + Construct(&factory_, std::move(promise_factory)); + } + + ~PromiseParticipantImpl() { + switch (state_.load(std::memory_order_acquire)) { + case State::kFactory: + Destruct(&factory_); + break; + case State::kPromise: + Destruct(&promise_); + break; + case State::kResult: + Destruct(&result_); + break; + } + } + + // Inside party poll: drive from factory -> promise -> result + bool PollParticipantPromise() override { + switch (state_.load(std::memory_order_relaxed)) { + case State::kFactory: { + auto p = factory_.Make(); + Destruct(&factory_); + Construct(&promise_, std::move(p)); + state_.store(State::kPromise, std::memory_order_relaxed); + } + ABSL_FALLTHROUGH_INTENDED; + case State::kPromise: { + auto p = promise_(); + if (auto* r = p.value_if_ready()) { + Destruct(&promise_); + Construct(&result_, std::move(*r)); + state_.store(State::kResult, std::memory_order_release); + waiter_.Wakeup(); + this->Unref(); + return true; + } + return false; + } + case State::kResult: + Crash( + "unreachable: promises should not be repolled after completion"); + } + } + + // Outside party poll: check whether the spawning party has completed this + // promise. + Poll PollCompletion() { + switch (state_.load(std::memory_order_acquire)) { + case State::kFactory: + case State::kPromise: + return Pending{}; + case State::kResult: + return std::move(result_); + } + } + + void Destroy() override { this->Unref(); } + + private: + enum class State : uint8_t { kFactory, kPromise, kResult }; + union { + GPR_NO_UNIQUE_ADDRESS Factory factory_; + GPR_NO_UNIQUE_ADDRESS Promise promise_; + GPR_NO_UNIQUE_ADDRESS Result result_; + }; + Waker waiter_{Activity::current()->MakeOwningWaker()}; + std::atomic state_{State::kFactory}; + }; + // Notification that the party has finished and this instance can be deleted. // Derived types should arrange to call CancelRemainingParticipants during // this sequence. @@ -502,6 +590,17 @@ void Party::Spawn(absl::string_view name, Factory promise_factory, std::move(on_complete)); } +template +auto Party::SpawnWaitable(absl::string_view name, Factory promise_factory) { + auto participant = MakeRefCounted>( + name, std::move(promise_factory)); + Participant* p = participant->Ref().release(); + AddParticipants(&p, 1); + return [participant = std::move(participant)]() mutable { + return participant->PollCompletion(); + }; +} + } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_PROMISE_PARTY_H diff --git a/src/core/lib/promise/wait_set.h b/src/core/lib/promise/wait_set.h index 2f978e5750a..bf1adc6c1c2 100644 --- a/src/core/lib/promise/wait_set.h +++ b/src/core/lib/promise/wait_set.h @@ -69,6 +69,12 @@ class WaitSet final { return ret; } + void WakeupAsync() { + while (!pending_.empty()) { + pending_.extract(pending_.begin()).value().WakeupAsync(); + } + } + private: // Handles to activities that need to be awoken. WakerSet pending_; diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD index 5073d060b01..2370dfdc4a7 100644 --- a/test/core/promise/BUILD +++ b/test/core/promise/BUILD @@ -419,6 +419,27 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "inter_activity_latch_test", + srcs = ["inter_activity_latch_test.cc"], + external_deps = [ + "absl/status", + "gtest", + ], + language = "c++", + tags = ["promise_test"], + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:grpc", + "//src/core:default_event_engine", + "//src/core:event_engine_wakeup_scheduler", + "//src/core:inter_activity_latch", + "//src/core:notification", + "//src/core:seq", + ], +) + grpc_cc_test( name = "mpsc_test", srcs = ["mpsc_test.cc"], @@ -591,6 +612,7 @@ grpc_cc_test( "//src/core:context", "//src/core:default_event_engine", "//src/core:event_engine_memory_allocator", + "//src/core:inter_activity_latch", "//src/core:memory_quota", "//src/core:notification", "//src/core:poll", diff --git a/test/core/promise/inter_activity_latch_test.cc b/test/core/promise/inter_activity_latch_test.cc new file mode 100644 index 00000000000..087489b240e --- /dev/null +++ b/test/core/promise/inter_activity_latch_test.cc @@ -0,0 +1,103 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/promise/inter_activity_latch.h" + +#include "absl/status/status.h" +#include "gtest/gtest.h" + +#include + +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/gprpp/notification.h" +#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" +#include "src/core/lib/promise/seq.h" + +using grpc_event_engine::experimental::GetDefaultEventEngine; + +namespace grpc_core { +namespace { + +TEST(InterActivityLatchTest, Works) { + InterActivityLatch latch; + + // Start some waiting activities + Notification n1; + auto a1 = MakeActivity( + [&] { + return Seq(latch.Wait(), [&](Empty) { + n1.Notify(); + return absl::OkStatus(); + }); + }, + EventEngineWakeupScheduler{GetDefaultEventEngine()}, [](absl::Status) {}); + Notification n2; + auto a2 = MakeActivity( + [&] { + return Seq(latch.Wait(), [&](Empty) { + n2.Notify(); + return absl::OkStatus(); + }); + }, + EventEngineWakeupScheduler{GetDefaultEventEngine()}, [](absl::Status) {}); + Notification n3; + auto a3 = MakeActivity( + [&] { + return Seq(latch.Wait(), [&](Empty) { + n3.Notify(); + return absl::OkStatus(); + }); + }, + EventEngineWakeupScheduler{GetDefaultEventEngine()}, [](absl::Status) {}); + + ASSERT_FALSE(n1.HasBeenNotified()); + ASSERT_FALSE(n2.HasBeenNotified()); + ASSERT_FALSE(n3.HasBeenNotified()); + + // Start a setting activity + auto kicker = MakeActivity( + [&] { + latch.Set(); + return absl::OkStatus(); + }, + EventEngineWakeupScheduler{GetDefaultEventEngine()}, [](absl::Status) {}); + + // Start another waiting activity + Notification n4; + auto a4 = MakeActivity( + [&] { + return Seq(latch.Wait(), [&](Empty) { + n4.Notify(); + return absl::OkStatus(); + }); + }, + EventEngineWakeupScheduler{GetDefaultEventEngine()}, [](absl::Status) {}); + + // Everything should finish + n1.WaitForNotification(); + n2.WaitForNotification(); + n3.WaitForNotification(); + n4.WaitForNotification(); +} + +} // namespace +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_init(); // for GetDefaultEventEngine + int r = RUN_ALL_TESTS(); + grpc_shutdown(); + return r; +} diff --git a/test/core/promise/party_test.cc b/test/core/promise/party_test.cc index a15b10a985b..551443ac4d5 100644 --- a/test/core/promise/party_test.cc +++ b/test/core/promise/party_test.cc @@ -36,6 +36,7 @@ #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/context.h" +#include "src/core/lib/promise/inter_activity_latch.h" #include "src/core/lib/promise/poll.h" #include "src/core/lib/promise/seq.h" #include "src/core/lib/promise/sleep.h" @@ -298,6 +299,31 @@ TEST_F(PartyTest, CanSpawnAndRun) { n.WaitForNotification(); } +TEST_F(PartyTest, CanSpawnWaitableAndRun) { + auto party1 = MakeRefCounted(); + auto party2 = MakeRefCounted(); + Notification n; + InterActivityLatch done; + // Spawn a task on party1 that will wait for a task on party2. + // The party2 task will wait on the latch `done`. + party1->Spawn( + "party1_main", + [&party2, &done]() { + return party2->SpawnWaitable("party2_main", + [&done]() { return done.Wait(); }); + }, + [&n](Empty) { n.Notify(); }); + ASSERT_FALSE(n.HasBeenNotified()); + party1->Spawn( + "party1_notify_latch", + [&done]() { + done.Set(); + return Empty{}; + }, + [](Empty) {}); + n.WaitForNotification(); +} + TEST_F(PartyTest, CanSpawnFromSpawn) { auto party = MakeRefCounted(); Notification n1; diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index f84a171ea9f..35bdddc0eb0 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -4987,6 +4987,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "inter_activity_latch_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false, From cb21299ea33644370c300145e192d8ac8a26015d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 14 Nov 2023 21:37:39 -0800 Subject: [PATCH 7/7] Internal change PiperOrigin-RevId: 582536567 --- OWNERS | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 OWNERS diff --git a/OWNERS b/OWNERS deleted file mode 100644 index cdbc4bec0ed..00000000000 --- a/OWNERS +++ /dev/null @@ -1,3 +0,0 @@ -# Top level ownership -@markdroth **/OWNERS -@a11r **/OWNERS