[promises] Fix ordering problems shown up deploying experiment internally (#33779)

Resolves a set of failures seen rolling out promises - we need to read
all of the incoming payload before doing request matching.

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/33895/head
Craig Tiller 2 years ago committed by GitHub
parent 8c2c35785e
commit dd3279ab53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 2
      Package.swift
  3. 10
      build_autogenerated.yaml
  4. 4
      gRPC-C++.podspec
  5. 4
      gRPC-Core.podspec
  6. 2
      grpc.gemspec
  7. 2
      package.xml
  8. 52
      src/core/lib/surface/server.cc
  9. 2
      tools/doxygen/Doxyfile.c++.internal
  10. 2
      tools/doxygen/Doxyfile.core.internal

@ -1534,7 +1534,6 @@ grpc_cc_library(
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:atomic_utils",
"//src/core:basic_join",
"//src/core:basic_seq",
"//src/core:bitset",
"//src/core:cancel_callback",
@ -1603,7 +1602,6 @@ grpc_cc_library(
"//src/core:thread_quota",
"//src/core:time",
"//src/core:transport_fwd",
"//src/core:try_join",
"//src/core:try_seq",
"//src/core:type_list",
"//src/core:useful",

2
Package.swift generated

@ -1442,7 +1442,6 @@ let package = Package(
"src/core/lib/promise/arena_promise.h",
"src/core/lib/promise/cancel_callback.h",
"src/core/lib/promise/context.h",
"src/core/lib/promise/detail/basic_join.h",
"src/core/lib/promise/detail/basic_seq.h",
"src/core/lib/promise/detail/promise_factory.h",
"src/core/lib/promise/detail/promise_like.h",
@ -1467,7 +1466,6 @@ let package = Package(
"src/core/lib/promise/sleep.h",
"src/core/lib/promise/trace.cc",
"src/core/lib/promise/trace.h",
"src/core/lib/promise/try_join.h",
"src/core/lib/promise/try_seq.h",
"src/core/lib/resolver/resolver.cc",
"src/core/lib/resolver/resolver.h",

@ -853,7 +853,6 @@ libs:
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
@ -875,7 +874,6 @@ libs:
- src/core/lib/promise/seq.h
- src/core/lib/promise/sleep.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_join.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
@ -2249,7 +2247,6 @@ libs:
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
@ -2271,7 +2268,6 @@ libs:
- src/core/lib/promise/seq.h
- src/core/lib/promise/sleep.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_join.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
@ -3753,7 +3749,6 @@ libs:
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
@ -3773,7 +3768,6 @@ libs:
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_join.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
@ -8274,7 +8268,6 @@ targets:
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
@ -8294,7 +8287,6 @@ targets:
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_join.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
@ -11449,6 +11441,7 @@ targets:
build: test
language: c++
headers:
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/join.h
- test/core/promise/test_wakeup_schedulers.h
src:
@ -11584,6 +11577,7 @@ targets:
build: test
language: c++
headers:
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/join.h
- src/core/lib/transport/promise_endpoint.h
- test/core/promise/test_wakeup_schedulers.h

4
gRPC-C++.podspec generated

@ -947,7 +947,6 @@ Pod::Spec.new do |s|
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
@ -969,7 +968,6 @@ Pod::Spec.new do |s|
'src/core/lib/promise/seq.h',
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/trace.h',
'src/core/lib/promise/try_join.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',
@ -1998,7 +1996,6 @@ Pod::Spec.new do |s|
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
@ -2020,7 +2017,6 @@ Pod::Spec.new do |s|
'src/core/lib/promise/seq.h',
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/trace.h',
'src/core/lib/promise/try_join.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',

4
gRPC-Core.podspec generated

@ -1543,7 +1543,6 @@ Pod::Spec.new do |s|
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
@ -1568,7 +1567,6 @@ Pod::Spec.new do |s|
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/trace.cc',
'src/core/lib/promise/trace.h',
'src/core/lib/promise/try_join.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver.h',
@ -2733,7 +2731,6 @@ Pod::Spec.new do |s|
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
@ -2755,7 +2752,6 @@ Pod::Spec.new do |s|
'src/core/lib/promise/seq.h',
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/trace.h',
'src/core/lib/promise/try_join.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',

2
grpc.gemspec generated

@ -1448,7 +1448,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/arena_promise.h )
s.files += %w( src/core/lib/promise/cancel_callback.h )
s.files += %w( src/core/lib/promise/context.h )
s.files += %w( src/core/lib/promise/detail/basic_join.h )
s.files += %w( src/core/lib/promise/detail/basic_seq.h )
s.files += %w( src/core/lib/promise/detail/promise_factory.h )
s.files += %w( src/core/lib/promise/detail/promise_like.h )
@ -1473,7 +1472,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/sleep.h )
s.files += %w( src/core/lib/promise/trace.cc )
s.files += %w( src/core/lib/promise/trace.h )
s.files += %w( src/core/lib/promise/try_join.h )
s.files += %w( src/core/lib/promise/try_seq.h )
s.files += %w( src/core/lib/resolver/resolver.cc )
s.files += %w( src/core/lib/resolver/resolver.h )

2
package.xml generated

@ -1430,7 +1430,6 @@
<file baseinstalldir="/" name="src/core/lib/promise/arena_promise.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/cancel_callback.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/context.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/basic_join.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/basic_seq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/promise_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/promise_like.h" role="src" />
@ -1455,7 +1454,6 @@
<file baseinstalldir="/" name="src/core/lib/promise/sleep.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/trace.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/trace.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/try_join.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/try_seq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/resolver/resolver.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/resolver/resolver.h" role="src" />

@ -28,7 +28,6 @@
#include <list>
#include <new>
#include <queue>
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>
@ -62,13 +61,11 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/basic_join.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
@ -322,19 +319,18 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
while (true) {
NextPendingCall next_pending = pop_next_pending();
if (next_pending.rc == nullptr) break;
auto mr = MatchResult(server(), request_queue_index, next_pending.rc);
Match(
next_pending.pending,
[&mr](CallData* calld) {
[&](CallData* calld) {
if (!calld->MaybeActivate()) {
// Zombied Call
calld->KillZombie();
} else {
calld->Publish(mr.cq_idx(), mr.TakeCall());
calld->Publish(request_queue_index, next_pending.rc);
}
},
[&mr](const std::shared_ptr<ActivityWaiter>& w) {
w->Finish(std::move(mr));
[&](const std::shared_ptr<ActivityWaiter>& w) {
w->Finish(server(), request_queue_index, next_pending.rc);
});
}
}
@ -430,8 +426,14 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
struct ActivityWaiter {
explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {}
~ActivityWaiter() { delete result.load(std::memory_order_acquire); }
void Finish(absl::StatusOr<MatchResult> r) {
result.store(new absl::StatusOr<MatchResult>(std::move(r)),
void Finish(absl::Status status) {
result.store(new absl::StatusOr<MatchResult>(std::move(status)),
std::memory_order_release);
waker.Wakeup();
}
void Finish(Server* server, size_t cq_idx, RequestedCall* requested_call) {
result.store(new absl::StatusOr<MatchResult>(
MatchResult(server, cq_idx, requested_call)),
std::memory_order_release);
waker.Wakeup();
}
@ -1336,22 +1338,32 @@ ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
matcher = server->unregistered_request_matcher_.get();
}
return TrySeq(
TryJoin(matcher->MatchRequest(chand->cq_idx()),
std::move(maybe_read_first_message)),
[path = std::move(*path), host_ptr, deadline,
call_args = std::move(call_args)](
std::tuple<RequestMatcherInterface::MatchResult,
NextResult<MessageHandle>>
match_result_and_payload) mutable {
auto& mr = std::get<0>(match_result_and_payload);
auto& payload = std::get<1>(match_result_and_payload);
std::move(maybe_read_first_message),
[matcher, chand](NextResult<MessageHandle> payload) {
return Map(
matcher->MatchRequest(chand->cq_idx()),
[payload = std::move(payload)](
absl::StatusOr<RequestMatcherInterface::MatchResult> mr) mutable
-> absl::StatusOr<std::pair<RequestMatcherInterface::MatchResult,
NextResult<MessageHandle>>> {
if (!mr.ok()) return mr.status();
return std::make_pair(std::move(*mr), std::move(payload));
});
},
[host_ptr, path = std::move(path), deadline,
call_args =
std::move(call_args)](std::pair<RequestMatcherInterface::MatchResult,
NextResult<MessageHandle>>
r) mutable {
auto& mr = r.first;
auto& payload = r.second;
auto* rc = mr.TakeCall();
auto* cq_for_new_request = mr.cq();
switch (rc->type) {
case RequestedCall::Type::BATCH_CALL:
GPR_ASSERT(!payload.has_value());
rc->data.batch.details->host = CSliceRef(host_ptr->c_slice());
rc->data.batch.details->method = CSliceRef(path.c_slice());
rc->data.batch.details->method = CSliceRef(path->c_slice());
rc->data.batch.details->deadline =
deadline.as_timespec(GPR_CLOCK_MONOTONIC);
break;

@ -2445,7 +2445,6 @@ src/core/lib/promise/activity.h \
src/core/lib/promise/arena_promise.h \
src/core/lib/promise/cancel_callback.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_join.h \
src/core/lib/promise/detail/basic_seq.h \
src/core/lib/promise/detail/promise_factory.h \
src/core/lib/promise/detail/promise_like.h \
@ -2470,7 +2469,6 @@ src/core/lib/promise/sleep.cc \
src/core/lib/promise/sleep.h \
src/core/lib/promise/trace.cc \
src/core/lib/promise/trace.h \
src/core/lib/promise/try_join.h \
src/core/lib/promise/try_seq.h \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver.h \

@ -2226,7 +2226,6 @@ src/core/lib/promise/activity.h \
src/core/lib/promise/arena_promise.h \
src/core/lib/promise/cancel_callback.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_join.h \
src/core/lib/promise/detail/basic_seq.h \
src/core/lib/promise/detail/promise_factory.h \
src/core/lib/promise/detail/promise_like.h \
@ -2251,7 +2250,6 @@ src/core/lib/promise/sleep.cc \
src/core/lib/promise/sleep.h \
src/core/lib/promise/trace.cc \
src/core/lib/promise/trace.h \
src/core/lib/promise/try_join.h \
src/core/lib/promise/try_seq.h \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver.h \

Loading…
Cancel
Save