[promises] Re-enable C++ end2end tests (with fixes) (#32837)

Makes some awkward fixes to compression filter, call, connected channel
to hold the semantics we have upheld now in tests.

Once the fixes described here
https://github.com/grpc/grpc/blob/master/src/core/lib/channel/connected_channel.cc#L636
are in this gets a lot less ad-hoc, but that's likely going to be
post-landing promises client & server side.

We specifically need special handling for server side cancellation in
response to reads wrt the inproc transport - which doesn't track
cancellation thoroughly enough itself.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/33008/head
Craig Tiller 2 years ago committed by GitHub
parent 65a2a895af
commit ad41fe96b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 38
      CMakeLists.txt
  3. 6
      bazel/experiments.bzl
  4. 14
      build_autogenerated.yaml
  5. 2
      gRPC-C++.podspec
  6. 2
      gRPC-Core.podspec
  7. 1
      grpc.gemspec
  8. 1
      package.xml
  9. 7
      src/core/BUILD
  10. 6
      src/core/ext/filters/http/message_compress/compression_filter.cc
  11. 4
      src/core/lib/experiments/experiments.yaml
  12. 95
      src/core/lib/promise/prioritized_race.h
  13. 12
      src/core/lib/surface/call.cc
  14. 14
      test/core/promise/BUILD
  15. 80
      test/core/promise/prioritized_race_test.cc
  16. 1
      tools/doxygen/Doxyfile.c++.internal
  17. 1
      tools/doxygen/Doxyfile.core.internal
  18. 24
      tools/run_tests/generated/tests.json

@ -3470,6 +3470,7 @@ grpc_cc_library(
"//src/core:percent_encoding",
"//src/core:pipe",
"//src/core:poll",
"//src/core:prioritized_race",
"//src/core:race",
"//src/core:slice",
"//src/core:slice_buffer",

38
CMakeLists.txt generated

@ -1115,6 +1115,7 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx posix_event_engine_test)
endif()
add_dependencies(buildtests_cxx prioritized_race_test)
add_dependencies(buildtests_cxx promise_factory_test)
add_dependencies(buildtests_cxx promise_map_test)
add_dependencies(buildtests_cxx promise_test)
@ -16703,6 +16704,43 @@ endif()
endif()
if(gRPC_BUILD_TESTS)
add_executable(prioritized_race_test
test/core/promise/prioritized_race_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_compile_features(prioritized_race_test PUBLIC cxx_std_14)
target_include_directories(prioritized_race_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(prioritized_race_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
gpr
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(promise_factory_test
test/core/promise/promise_factory_test.cc
third_party/googletest/googletest/src/gtest-all.cc

@ -28,6 +28,9 @@ EXPERIMENTS = {
"promise_based_client_call",
"promise_based_server_call",
],
"cpp_end2end_test": [
"promise_based_server_call",
],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@ -51,6 +54,9 @@ EXPERIMENTS = {
"memory_pressure_controller",
"unconstrained_max_quota_buffer_size",
],
"xds_end2end_test": [
"promise_based_server_call",
],
},
"on": {
"flow_control_test": [

@ -845,6 +845,7 @@ libs:
- src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/prioritized_race.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
@ -2204,6 +2205,7 @@ libs:
- src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/prioritized_race.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
@ -10109,6 +10111,18 @@ targets:
- linux
- posix
- mac
- name: prioritized_race_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/promise/poll.h
- src/core/lib/promise/prioritized_race.h
src:
- test/core/promise/prioritized_race_test.cc
deps:
- gpr
uses_polling: false
- name: promise_factory_test
gtest: true
build: test

2
gRPC-C++.podspec generated

@ -940,6 +940,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/prioritized_race.h',
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
@ -1966,6 +1967,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/prioritized_race.h',
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',

2
gRPC-Core.podspec generated

@ -1522,6 +1522,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/prioritized_race.h',
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
@ -2685,6 +2686,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/prioritized_race.h',
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',

1
grpc.gemspec generated

@ -1428,6 +1428,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/party.h )
s.files += %w( src/core/lib/promise/pipe.h )
s.files += %w( src/core/lib/promise/poll.h )
s.files += %w( src/core/lib/promise/prioritized_race.h )
s.files += %w( src/core/lib/promise/promise.h )
s.files += %w( src/core/lib/promise/race.h )
s.files += %w( src/core/lib/promise/seq.h )

1
package.xml generated

@ -1410,6 +1410,7 @@
<file baseinstalldir="/" name="src/core/lib/promise/party.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/pipe.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/poll.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/prioritized_race.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/promise.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/race.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/seq.h" role="src" />

@ -573,6 +573,13 @@ grpc_cc_library(
deps = ["//:gpr_platform"],
)
grpc_cc_library(
name = "prioritized_race",
language = "c++",
public_hdrs = ["lib/promise/prioritized_race.h"],
deps = ["//:gpr_platform"],
)
grpc_cc_library(
name = "loop",
external_deps = [

@ -49,7 +49,7 @@
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/promise/prioritized_race.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call.h"
@ -273,7 +273,7 @@ ArenaPromise<ServerMetadataHandle> ClientCompressionFilter::MakeCallPromise(
return std::move(*r);
});
// Run the next filter, and race it with getting an error from decompression.
return Race(decompress_err->Wait(),
return PrioritizedRace(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
}
@ -316,7 +316,7 @@ ArenaPromise<ServerMetadataHandle> ServerCompressionFilter::MakeCallPromise(
return CompressMessage(std::move(message), *compression_algorithm);
});
// Run the next filter, and race it with getting an error from decompression.
return Race(decompress_err->Wait(),
return PrioritizedRace(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
}

@ -112,9 +112,7 @@
default: false
expiry: 2023/06/01
owner: ctiller@google.com
test_tags: ["core_end2end_test"]
# removing these until I can stabilize tests a little further
disabled_test_tags: ["cpp_end2end_test", "xds_end2end_test"]
test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test"]
- name: transport_supplies_client_latency
description: If set, use the transport represented value for client latency in opencensus
default: false

@ -0,0 +1,95 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_PROMISE_PRIORITIZED_RACE_H
#define GRPC_SRC_CORE_LIB_PROMISE_PRIORITIZED_RACE_H
#include <grpc/support/port_platform.h>
#include <type_traits>
#include <utility>
namespace grpc_core {
namespace promise_detail {
template <typename A, typename B>
class TwoPartyPrioritizedRace {
public:
using Result = decltype(std::declval<A>()());
explicit TwoPartyPrioritizedRace(A a, B b)
: a_(std::move(a)), b_(std::move(b)) {}
Result operator()() {
// Check the priority promise.
auto p = a_();
if (p.ready()) return p;
// Check the other promise.
p = b_();
if (p.ready()) {
// re-poll a to see if it's also completed.
auto q = a_();
if (q.ready()) {
// both are ready, but a is prioritized
return q;
}
}
return p;
}
private:
A a_;
B b_;
};
template <typename... Promises>
class PrioritizedRace;
template <typename Promise, typename... Promises>
class PrioritizedRace<Promise, Promises...>
: public TwoPartyPrioritizedRace<Promise, PrioritizedRace<Promises...>> {
public:
using Result = decltype(std::declval<Promise>()());
explicit PrioritizedRace(Promise promise, Promises... promises)
: TwoPartyPrioritizedRace<Promise, PrioritizedRace<Promises...>>(
std::move(promise),
PrioritizedRace<Promises...>(std::move(promises)...)) {}
};
template <typename Promise>
class PrioritizedRace<Promise> {
public:
using Result = decltype(std::declval<Promise>()());
explicit PrioritizedRace(Promise promise) : promise_(std::move(promise)) {}
Result operator()() { return promise_(); }
private:
Promise promise_;
};
} // namespace promise_detail
/// Run all the promises until one is non-pending.
/// Once there's a non-pending promise, repoll all the promises before that.
/// Return the result from the lexically first non-pending promise.
template <typename... Promises>
promise_detail::PrioritizedRace<Promises...> PrioritizedRace(
Promises... promises) {
return promise_detail::PrioritizedRace<Promises...>(std::move(promises)...);
}
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_PRIORITIZED_RACE_H

@ -2183,7 +2183,7 @@ class PromiseBasedCall : public Call,
void StartRecvMessage(const grpc_op& op, const Completion& completion,
FirstPromise first,
PipeReceiver<MessageHandle>* receiver,
Party::BulkSpawner& spawner);
bool cancel_on_error, Party::BulkSpawner& spawner);
void StartSendMessage(const grpc_op& op, const Completion& completion,
PipeSender<MessageHandle>* sender,
Party::BulkSpawner& spawner);
@ -2538,7 +2538,8 @@ template <typename FirstPromiseFactory>
void PromiseBasedCall::StartRecvMessage(
const grpc_op& op, const Completion& completion,
FirstPromiseFactory first_promise_factory,
PipeReceiver<MessageHandle>* receiver, Party::BulkSpawner& spawner) {
PipeReceiver<MessageHandle>* receiver, bool cancel_on_error,
Party::BulkSpawner& spawner) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[call] Start RecvMessage: %s", DebugTag().c_str(),
CompletionString(completion).c_str());
@ -2549,7 +2550,7 @@ void PromiseBasedCall::StartRecvMessage(
[first_promise_factory = std::move(first_promise_factory), receiver]() {
return Seq(first_promise_factory(), receiver->Next());
},
[this,
[this, cancel_on_error,
completion = AddOpToCompletion(completion, PendingOp::kReceiveMessage)](
NextResult<MessageHandle> result) mutable {
if (result.has_value()) {
@ -2580,6 +2581,7 @@ void PromiseBasedCall::StartRecvMessage(
}
failed_before_recv_message_.store(true);
FailCompletion(completion);
if (cancel_on_error) CancelWithError(absl::CancelledError());
*recv_message_ = nullptr;
} else {
if (grpc_call_trace.enabled()) {
@ -2873,7 +2875,7 @@ void ClientPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
[this]() {
return server_initial_metadata_.receiver.AwaitClosed();
},
&server_to_client_messages_.receiver, spawner);
&server_to_client_messages_.receiver, false, spawner);
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
spawner.Spawn(
@ -3328,7 +3330,7 @@ void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
}
StartRecvMessage(
op, completion, []() { return []() { return Empty{}; }; },
client_to_server_messages_, spawner);
client_to_server_messages_, true, spawner);
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
auto metadata = arena()->MakePooled<ServerMetadata>(arena());

@ -155,6 +155,20 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "prioritized_race_test",
srcs = ["prioritized_race_test.cc"],
external_deps = ["gtest"],
language = "c++",
tags = ["promise_test"],
uses_event_engine = False,
uses_polling = False,
deps = [
"//src/core:poll",
"//src/core:prioritized_race",
],
)
grpc_cc_test(
name = "promise_factory_test",
srcs = ["promise_factory_test.cc"],

@ -0,0 +1,80 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/promise/prioritized_race.h"
#include "gtest/gtest.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
Poll<int> instant() { return 1; }
Poll<int> never() { return Pending(); }
TEST(PrioritizedRaceTest, Race1) {
EXPECT_EQ(PrioritizedRace(instant)(), Poll<int>(1));
}
TEST(PrioritizedRaceTest, Race2A) {
EXPECT_EQ(PrioritizedRace(instant, never)(), Poll<int>(1));
}
TEST(PrioritizedRaceTest, Race2B) {
EXPECT_EQ(PrioritizedRace(never, instant)(), Poll<int>(1));
}
TEST(PrioritizedRaceTest, PrioritizedCompletion2A) {
int first_polls = 0;
int second_polls = 0;
auto r = PrioritizedRace(
[&first_polls]() -> Poll<int> {
++first_polls;
return 1;
},
[&second_polls]() {
++second_polls;
return 2;
})();
EXPECT_EQ(r, Poll<int>(1));
// First promise completes immediately, so second promise is never polled.
EXPECT_EQ(first_polls, 1);
EXPECT_EQ(second_polls, 0);
}
TEST(PrioritizedRaceTest, PrioritizedCompletion2B) {
int first_polls = 0;
int second_polls = 0;
auto r = PrioritizedRace(
[&first_polls]() -> Poll<int> {
++first_polls;
if (first_polls > 1) return 1;
return Pending{};
},
[&second_polls]() {
++second_polls;
return 2;
})();
EXPECT_EQ(r, Poll<int>(1));
// First promise completes after second promise is polled.
EXPECT_EQ(first_polls, 2);
EXPECT_EQ(second_polls, 1);
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -2424,6 +2424,7 @@ src/core/lib/promise/party.cc \
src/core/lib/promise/party.h \
src/core/lib/promise/pipe.h \
src/core/lib/promise/poll.h \
src/core/lib/promise/prioritized_race.h \
src/core/lib/promise/promise.h \
src/core/lib/promise/race.h \
src/core/lib/promise/seq.h \

@ -2205,6 +2205,7 @@ src/core/lib/promise/party.cc \
src/core/lib/promise/party.h \
src/core/lib/promise/pipe.h \
src/core/lib/promise/poll.h \
src/core/lib/promise/prioritized_race.h \
src/core/lib/promise/promise.h \
src/core/lib/promise/race.h \
src/core/lib/promise/seq.h \

@ -5561,6 +5561,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "prioritized_race_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save