From dd3279ab53dd6fb97e9b61dd4dff3d86d51f1580 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 26 Jul 2023 14:47:31 -0700 Subject: [PATCH] [promises] Fix ordering problems shown up deploying experiment internally (#33779) Resolves a set of failures seen rolling out promises - we need to read all of the incoming payload before doing request matching. --------- Co-authored-by: ctiller --- BUILD | 2 -- Package.swift | 2 -- build_autogenerated.yaml | 10 ++---- gRPC-C++.podspec | 4 --- gRPC-Core.podspec | 4 --- grpc.gemspec | 2 -- package.xml | 2 -- src/core/lib/surface/server.cc | 52 +++++++++++++++++----------- tools/doxygen/Doxyfile.c++.internal | 2 -- tools/doxygen/Doxyfile.core.internal | 2 -- 10 files changed, 34 insertions(+), 48 deletions(-) diff --git a/BUILD b/BUILD index 1b81c905824..37f59e2115c 100644 --- a/BUILD +++ b/BUILD @@ -1534,7 +1534,6 @@ grpc_cc_library( "//src/core:arena", "//src/core:arena_promise", "//src/core:atomic_utils", - "//src/core:basic_join", "//src/core:basic_seq", "//src/core:bitset", "//src/core:cancel_callback", @@ -1603,7 +1602,6 @@ grpc_cc_library( "//src/core:thread_quota", "//src/core:time", "//src/core:transport_fwd", - "//src/core:try_join", "//src/core:try_seq", "//src/core:type_list", "//src/core:useful", diff --git a/Package.swift b/Package.swift index 62ca96e64a4..b712cd822d9 100644 --- a/Package.swift +++ b/Package.swift @@ -1442,7 +1442,6 @@ let package = Package( "src/core/lib/promise/arena_promise.h", "src/core/lib/promise/cancel_callback.h", "src/core/lib/promise/context.h", - "src/core/lib/promise/detail/basic_join.h", "src/core/lib/promise/detail/basic_seq.h", "src/core/lib/promise/detail/promise_factory.h", "src/core/lib/promise/detail/promise_like.h", @@ -1467,7 +1466,6 @@ let package = Package( "src/core/lib/promise/sleep.h", "src/core/lib/promise/trace.cc", "src/core/lib/promise/trace.h", - "src/core/lib/promise/try_join.h", "src/core/lib/promise/try_seq.h", "src/core/lib/resolver/resolver.cc", "src/core/lib/resolver/resolver.h", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index d7a2af54dd2..4be7bf55a5a 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -853,7 +853,6 @@ libs: - src/core/lib/promise/arena_promise.h - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - - src/core/lib/promise/detail/basic_join.h - src/core/lib/promise/detail/basic_seq.h - src/core/lib/promise/detail/promise_factory.h - src/core/lib/promise/detail/promise_like.h @@ -875,7 +874,6 @@ libs: - src/core/lib/promise/seq.h - src/core/lib/promise/sleep.h - src/core/lib/promise/trace.h - - src/core/lib/promise/try_join.h - src/core/lib/promise/try_seq.h - src/core/lib/resolver/resolver.h - src/core/lib/resolver/resolver_factory.h @@ -2249,7 +2247,6 @@ libs: - src/core/lib/promise/arena_promise.h - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - - src/core/lib/promise/detail/basic_join.h - src/core/lib/promise/detail/basic_seq.h - src/core/lib/promise/detail/promise_factory.h - src/core/lib/promise/detail/promise_like.h @@ -2271,7 +2268,6 @@ libs: - src/core/lib/promise/seq.h - src/core/lib/promise/sleep.h - src/core/lib/promise/trace.h - - src/core/lib/promise/try_join.h - src/core/lib/promise/try_seq.h - src/core/lib/resolver/resolver.h - src/core/lib/resolver/resolver_factory.h @@ -3753,7 +3749,6 @@ libs: - src/core/lib/promise/arena_promise.h - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - - src/core/lib/promise/detail/basic_join.h - src/core/lib/promise/detail/basic_seq.h - src/core/lib/promise/detail/promise_factory.h - src/core/lib/promise/detail/promise_like.h @@ -3773,7 +3768,6 @@ libs: - src/core/lib/promise/race.h - src/core/lib/promise/seq.h - src/core/lib/promise/trace.h - - src/core/lib/promise/try_join.h - src/core/lib/promise/try_seq.h - src/core/lib/resolver/resolver.h - src/core/lib/resolver/resolver_factory.h @@ -8274,7 +8268,6 @@ targets: - src/core/lib/promise/arena_promise.h - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - - src/core/lib/promise/detail/basic_join.h - src/core/lib/promise/detail/basic_seq.h - src/core/lib/promise/detail/promise_factory.h - src/core/lib/promise/detail/promise_like.h @@ -8294,7 +8287,6 @@ targets: - src/core/lib/promise/race.h - src/core/lib/promise/seq.h - src/core/lib/promise/trace.h - - src/core/lib/promise/try_join.h - src/core/lib/promise/try_seq.h - src/core/lib/resolver/resolver.h - src/core/lib/resolver/resolver_factory.h @@ -11449,6 +11441,7 @@ targets: build: test language: c++ headers: + - src/core/lib/promise/detail/basic_join.h - src/core/lib/promise/join.h - test/core/promise/test_wakeup_schedulers.h src: @@ -11584,6 +11577,7 @@ targets: build: test language: c++ headers: + - src/core/lib/promise/detail/basic_join.h - src/core/lib/promise/join.h - src/core/lib/transport/promise_endpoint.h - test/core/promise/test_wakeup_schedulers.h diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index a32da715385..0ff4a859756 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -947,7 +947,6 @@ Pod::Spec.new do |s| 'src/core/lib/promise/arena_promise.h', 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', - 'src/core/lib/promise/detail/basic_join.h', 'src/core/lib/promise/detail/basic_seq.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', @@ -969,7 +968,6 @@ Pod::Spec.new do |s| 'src/core/lib/promise/seq.h', 'src/core/lib/promise/sleep.h', 'src/core/lib/promise/trace.h', - 'src/core/lib/promise/try_join.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.h', 'src/core/lib/resolver/resolver_factory.h', @@ -1998,7 +1996,6 @@ Pod::Spec.new do |s| 'src/core/lib/promise/arena_promise.h', 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', - 'src/core/lib/promise/detail/basic_join.h', 'src/core/lib/promise/detail/basic_seq.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', @@ -2020,7 +2017,6 @@ Pod::Spec.new do |s| 'src/core/lib/promise/seq.h', 'src/core/lib/promise/sleep.h', 'src/core/lib/promise/trace.h', - 'src/core/lib/promise/try_join.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.h', 'src/core/lib/resolver/resolver_factory.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index a2b15673bf4..1f7ee059f52 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1543,7 +1543,6 @@ Pod::Spec.new do |s| 'src/core/lib/promise/arena_promise.h', 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', - 'src/core/lib/promise/detail/basic_join.h', 'src/core/lib/promise/detail/basic_seq.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', @@ -1568,7 +1567,6 @@ Pod::Spec.new do |s| 'src/core/lib/promise/sleep.h', 'src/core/lib/promise/trace.cc', 'src/core/lib/promise/trace.h', - 'src/core/lib/promise/try_join.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.cc', 'src/core/lib/resolver/resolver.h', @@ -2733,7 +2731,6 @@ Pod::Spec.new do |s| 'src/core/lib/promise/arena_promise.h', 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', - 'src/core/lib/promise/detail/basic_join.h', 'src/core/lib/promise/detail/basic_seq.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', @@ -2755,7 +2752,6 @@ Pod::Spec.new do |s| 'src/core/lib/promise/seq.h', 'src/core/lib/promise/sleep.h', 'src/core/lib/promise/trace.h', - 'src/core/lib/promise/try_join.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.h', 'src/core/lib/resolver/resolver_factory.h', diff --git a/grpc.gemspec b/grpc.gemspec index 67a4bfcf084..8eecc38bea7 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1448,7 +1448,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/promise/arena_promise.h ) s.files += %w( src/core/lib/promise/cancel_callback.h ) s.files += %w( src/core/lib/promise/context.h ) - s.files += %w( src/core/lib/promise/detail/basic_join.h ) s.files += %w( src/core/lib/promise/detail/basic_seq.h ) s.files += %w( src/core/lib/promise/detail/promise_factory.h ) s.files += %w( src/core/lib/promise/detail/promise_like.h ) @@ -1473,7 +1472,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/promise/sleep.h ) s.files += %w( src/core/lib/promise/trace.cc ) s.files += %w( src/core/lib/promise/trace.h ) - s.files += %w( src/core/lib/promise/try_join.h ) s.files += %w( src/core/lib/promise/try_seq.h ) s.files += %w( src/core/lib/resolver/resolver.cc ) s.files += %w( src/core/lib/resolver/resolver.h ) diff --git a/package.xml b/package.xml index 7a21ab7ac7d..00f44d11922 100644 --- a/package.xml +++ b/package.xml @@ -1430,7 +1430,6 @@ - @@ -1455,7 +1454,6 @@ - diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index e2a2bbca1f7..264d8e9ba1c 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -28,7 +28,6 @@ #include #include #include -#include #include #include #include @@ -62,13 +61,11 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/context.h" -#include "src/core/lib/promise/detail/basic_join.h" #include "src/core/lib/promise/detail/basic_seq.h" #include "src/core/lib/promise/map.h" #include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/poll.h" #include "src/core/lib/promise/promise.h" -#include "src/core/lib/promise/try_join.h" #include "src/core/lib/promise/try_seq.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_internal.h" @@ -322,19 +319,18 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { while (true) { NextPendingCall next_pending = pop_next_pending(); if (next_pending.rc == nullptr) break; - auto mr = MatchResult(server(), request_queue_index, next_pending.rc); Match( next_pending.pending, - [&mr](CallData* calld) { + [&](CallData* calld) { if (!calld->MaybeActivate()) { // Zombied Call calld->KillZombie(); } else { - calld->Publish(mr.cq_idx(), mr.TakeCall()); + calld->Publish(request_queue_index, next_pending.rc); } }, - [&mr](const std::shared_ptr& w) { - w->Finish(std::move(mr)); + [&](const std::shared_ptr& w) { + w->Finish(server(), request_queue_index, next_pending.rc); }); } } @@ -430,8 +426,14 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { struct ActivityWaiter { explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {} ~ActivityWaiter() { delete result.load(std::memory_order_acquire); } - void Finish(absl::StatusOr r) { - result.store(new absl::StatusOr(std::move(r)), + void Finish(absl::Status status) { + result.store(new absl::StatusOr(std::move(status)), + std::memory_order_release); + waker.Wakeup(); + } + void Finish(Server* server, size_t cq_idx, RequestedCall* requested_call) { + result.store(new absl::StatusOr( + MatchResult(server, cq_idx, requested_call)), std::memory_order_release); waker.Wakeup(); } @@ -1336,22 +1338,32 @@ ArenaPromise Server::ChannelData::MakeCallPromise( matcher = server->unregistered_request_matcher_.get(); } return TrySeq( - TryJoin(matcher->MatchRequest(chand->cq_idx()), - std::move(maybe_read_first_message)), - [path = std::move(*path), host_ptr, deadline, - call_args = std::move(call_args)]( - std::tuple> - match_result_and_payload) mutable { - auto& mr = std::get<0>(match_result_and_payload); - auto& payload = std::get<1>(match_result_and_payload); + std::move(maybe_read_first_message), + [matcher, chand](NextResult payload) { + return Map( + matcher->MatchRequest(chand->cq_idx()), + [payload = std::move(payload)]( + absl::StatusOr mr) mutable + -> absl::StatusOr>> { + if (!mr.ok()) return mr.status(); + return std::make_pair(std::move(*mr), std::move(payload)); + }); + }, + [host_ptr, path = std::move(path), deadline, + call_args = + std::move(call_args)](std::pair> + r) mutable { + auto& mr = r.first; + auto& payload = r.second; auto* rc = mr.TakeCall(); auto* cq_for_new_request = mr.cq(); switch (rc->type) { case RequestedCall::Type::BATCH_CALL: GPR_ASSERT(!payload.has_value()); rc->data.batch.details->host = CSliceRef(host_ptr->c_slice()); - rc->data.batch.details->method = CSliceRef(path.c_slice()); + rc->data.batch.details->method = CSliceRef(path->c_slice()); rc->data.batch.details->deadline = deadline.as_timespec(GPR_CLOCK_MONOTONIC); break; diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 4e3efe3e044..ab7809664f8 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2445,7 +2445,6 @@ src/core/lib/promise/activity.h \ src/core/lib/promise/arena_promise.h \ src/core/lib/promise/cancel_callback.h \ src/core/lib/promise/context.h \ -src/core/lib/promise/detail/basic_join.h \ src/core/lib/promise/detail/basic_seq.h \ src/core/lib/promise/detail/promise_factory.h \ src/core/lib/promise/detail/promise_like.h \ @@ -2470,7 +2469,6 @@ src/core/lib/promise/sleep.cc \ src/core/lib/promise/sleep.h \ src/core/lib/promise/trace.cc \ src/core/lib/promise/trace.h \ -src/core/lib/promise/try_join.h \ src/core/lib/promise/try_seq.h \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index e778bb99a8e..d5b0b9258ab 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -2226,7 +2226,6 @@ src/core/lib/promise/activity.h \ src/core/lib/promise/arena_promise.h \ src/core/lib/promise/cancel_callback.h \ src/core/lib/promise/context.h \ -src/core/lib/promise/detail/basic_join.h \ src/core/lib/promise/detail/basic_seq.h \ src/core/lib/promise/detail/promise_factory.h \ src/core/lib/promise/detail/promise_like.h \ @@ -2251,7 +2250,6 @@ src/core/lib/promise/sleep.cc \ src/core/lib/promise/sleep.h \ src/core/lib/promise/trace.cc \ src/core/lib/promise/trace.h \ -src/core/lib/promise/try_join.h \ src/core/lib/promise/try_seq.h \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver.h \