From 1751f1043ed00cb064907bf6943c7fcad72516af Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 18 Jan 2024 20:06:39 -0800 Subject: [PATCH] [transport] Add a transport test suite for promise based transports (#35476) Implemented for inproc & chaotic-good Closes #35476 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35476 from ctiller:v3-svall 5358538d548b98bdc6fdda0fae772d222695cd0d PiperOrigin-RevId: 599701775 --- BUILD | 1 + CMakeLists.txt | 126 ++++ build_autogenerated.yaml | 70 ++ src/core/BUILD | 2 + .../chaotic_good/chaotic_good_transport.cc | 2 + .../chaotic_good/chaotic_good_transport.h | 28 +- .../chaotic_good/client_transport.cc | 18 +- .../chaotic_good/server_transport.cc | 190 +++-- .../transport/chaotic_good/server_transport.h | 10 +- .../ext/transport/inproc/inproc_transport.cc | 33 +- .../ext/transport/inproc/inproc_transport.h | 8 + src/core/lib/gprpp/debug_location.h | 2 + src/core/lib/promise/for_each.h | 13 +- src/core/lib/promise/inter_activity_pipe.h | 20 +- src/core/lib/promise/observable.h | 2 +- src/core/lib/promise/status_flag.h | 1 + src/core/lib/transport/promise_endpoint.cc | 3 - src/core/lib/transport/transport.cc | 51 +- src/core/lib/transport/transport.h | 50 +- test/core/promise/inter_activity_pipe_test.cc | 44 +- .../client_transport_error_test.cc | 12 +- .../chaotic_good/client_transport_test.cc | 22 +- test/core/transport/test_suite/BUILD | 165 +++++ .../core/transport/test_suite/call_content.cc | 151 ++++ test/core/transport/test_suite/call_shapes.cc | 681 ++++++++++++++++++ .../test_suite/chaotic_good_fixture.cc | 117 +++ .../test_suite/corpus/chaotic_good/empty | 1 + .../transport/test_suite/corpus/inproc/empty | 1 + test/core/transport/test_suite/fixture.cc | 29 + test/core/transport/test_suite/fixture.h | 77 ++ test/core/transport/test_suite/fuzzer.proto | 29 + test/core/transport/test_suite/fuzzer_main.cc | 72 ++ .../test_suite/grpc_transport_test.bzl | 58 ++ .../transport/test_suite/inproc_fixture.cc | 25 + test/core/transport/test_suite/no_op.cc | 28 + test/core/transport/test_suite/stress.cc | 131 ++++ test/core/transport/test_suite/test.cc | 271 +++++++ test/core/transport/test_suite/test.h | 364 ++++++++++ test/core/transport/test_suite/test_main.cc | 42 ++ tools/distrib/fix_build_deps.py | 22 +- tools/run_tests/generated/tests.json | 40 + 41 files changed, 2846 insertions(+), 166 deletions(-) create mode 100644 test/core/transport/test_suite/BUILD create mode 100644 test/core/transport/test_suite/call_content.cc create mode 100644 test/core/transport/test_suite/call_shapes.cc create mode 100644 test/core/transport/test_suite/chaotic_good_fixture.cc create mode 100644 test/core/transport/test_suite/corpus/chaotic_good/empty create mode 100644 test/core/transport/test_suite/corpus/inproc/empty create mode 100644 test/core/transport/test_suite/fixture.cc create mode 100644 test/core/transport/test_suite/fixture.h create mode 100644 test/core/transport/test_suite/fuzzer.proto create mode 100644 test/core/transport/test_suite/fuzzer_main.cc create mode 100644 test/core/transport/test_suite/grpc_transport_test.bzl create mode 100644 test/core/transport/test_suite/inproc_fixture.cc create mode 100644 test/core/transport/test_suite/no_op.cc create mode 100644 test/core/transport/test_suite/stress.cc create mode 100644 test/core/transport/test_suite/test.cc create mode 100644 test/core/transport/test_suite/test.h create mode 100644 test/core/transport/test_suite/test_main.cc diff --git a/BUILD b/BUILD index d93ccef6408..030eeb0605f 100644 --- a/BUILD +++ b/BUILD @@ -1580,6 +1580,7 @@ grpc_cc_library( "//src/core:poll", "//src/core:pollset_set", "//src/core:posix_event_engine_base_hdrs", + "//src/core:prioritized_race", "//src/core:promise_status", "//src/core:race", "//src/core:random_early_detection", diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d64f9eaf24..1f2bc20c696 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -957,6 +957,9 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx channel_trace_test) add_dependencies(buildtests_cxx channelz_registry_test) add_dependencies(buildtests_cxx channelz_service_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx chaotic_good_test) + endif() add_dependencies(buildtests_cxx check_gcp_environment_linux_test) add_dependencies(buildtests_cxx check_gcp_environment_windows_test) add_dependencies(buildtests_cxx chunked_vector_test) @@ -1129,6 +1132,9 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx if_test) add_dependencies(buildtests_cxx init_test) add_dependencies(buildtests_cxx initial_settings_frame_bad_client_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx inproc_test) + endif() add_dependencies(buildtests_cxx insecure_security_connector_test) add_dependencies(buildtests_cxx inter_activity_latch_test) add_dependencies(buildtests_cxx inter_activity_pipe_test) @@ -9884,6 +9890,69 @@ target_link_libraries(channelz_service_test ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) + + add_executable(chaotic_good_test + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h + src/core/ext/transport/chaotic_good/chaotic_good_transport.cc + src/core/ext/transport/chaotic_good/client_transport.cc + src/core/ext/transport/chaotic_good/frame.cc + src/core/ext/transport/chaotic_good/frame_header.cc + src/core/ext/transport/chaotic_good/server_transport.cc + src/core/lib/transport/promise_endpoint.cc + test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + test/core/transport/test_suite/call_content.cc + test/core/transport/test_suite/call_shapes.cc + test/core/transport/test_suite/chaotic_good_fixture.cc + test/core/transport/test_suite/fixture.cc + test/core/transport/test_suite/no_op.cc + test/core/transport/test_suite/stress.cc + test/core/transport/test_suite/test.cc + test/core/transport/test_suite/test_main.cc + ) + if(WIN32 AND MSVC) + if(BUILD_SHARED_LIBS) + target_compile_definitions(chaotic_good_test + PRIVATE + "GPR_DLL_IMPORTS" + "GRPC_DLL_IMPORTS" + ) + endif() + endif() + target_compile_features(chaotic_good_test PUBLIC cxx_std_14) + target_include_directories(chaotic_good_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(chaotic_good_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + ${_gRPC_PROTOBUF_LIBRARIES} + grpc_test_util + ) + + +endif() endif() if(gRPC_BUILD_TESTS) @@ -17208,6 +17277,63 @@ target_link_libraries(initial_settings_frame_bad_client_test ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) + + add_executable(inproc_test + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h + test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + test/core/transport/test_suite/call_content.cc + test/core/transport/test_suite/call_shapes.cc + test/core/transport/test_suite/fixture.cc + test/core/transport/test_suite/inproc_fixture.cc + test/core/transport/test_suite/no_op.cc + test/core/transport/test_suite/stress.cc + test/core/transport/test_suite/test.cc + test/core/transport/test_suite/test_main.cc + ) + if(WIN32 AND MSVC) + if(BUILD_SHARED_LIBS) + target_compile_definitions(inproc_test + PRIVATE + "GPR_DLL_IMPORTS" + "GRPC_DLL_IMPORTS" + ) + endif() + endif() + target_compile_features(inproc_test PUBLIC cxx_std_14) + target_include_directories(inproc_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(inproc_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + ${_gRPC_PROTOBUF_LIBRARIES} + grpc_test_util + ) + + +endif() endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 1409be0579a..d34ea3fc363 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -4572,6 +4572,7 @@ libs: - src/core/lib/promise/party.h - src/core/lib/promise/pipe.h - src/core/lib/promise/poll.h + - src/core/lib/promise/prioritized_race.h - src/core/lib/promise/promise.h - src/core/lib/promise/race.h - src/core/lib/promise/seq.h @@ -7262,6 +7263,49 @@ targets: - gtest - grpcpp_channelz - grpc++_test_util +- name: chaotic_good_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/transport/chaotic_good/chaotic_good_transport.h + - src/core/ext/transport/chaotic_good/client_transport.h + - src/core/ext/transport/chaotic_good/frame.h + - src/core/ext/transport/chaotic_good/frame_header.h + - src/core/ext/transport/chaotic_good/server_transport.h + - src/core/lib/promise/event_engine_wakeup_scheduler.h + - src/core/lib/promise/inter_activity_pipe.h + - src/core/lib/promise/mpsc.h + - src/core/lib/promise/switch.h + - src/core/lib/promise/wait_set.h + - src/core/lib/transport/promise_endpoint.h + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + - test/core/transport/test_suite/fixture.h + - test/core/transport/test_suite/test.h + src: + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto + - src/core/ext/transport/chaotic_good/chaotic_good_transport.cc + - src/core/ext/transport/chaotic_good/client_transport.cc + - src/core/ext/transport/chaotic_good/frame.cc + - src/core/ext/transport/chaotic_good/frame_header.cc + - src/core/ext/transport/chaotic_good/server_transport.cc + - src/core/lib/transport/promise_endpoint.cc + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + - test/core/transport/test_suite/call_content.cc + - test/core/transport/test_suite/call_shapes.cc + - test/core/transport/test_suite/chaotic_good_fixture.cc + - test/core/transport/test_suite/fixture.cc + - test/core/transport/test_suite/no_op.cc + - test/core/transport/test_suite/stress.cc + - test/core/transport/test_suite/test.cc + - test/core/transport/test_suite/test_main.cc + deps: + - gtest + - protobuf + - grpc_test_util + platforms: + - linux + - posix - name: check_gcp_environment_linux_test gtest: true build: test @@ -10775,6 +10819,32 @@ targets: deps: - gtest - grpc_test_util +- name: inproc_test + gtest: true + build: test + language: c++ + headers: + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + - test/core/transport/test_suite/fixture.h + - test/core/transport/test_suite/test.h + src: + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + - test/core/transport/test_suite/call_content.cc + - test/core/transport/test_suite/call_shapes.cc + - test/core/transport/test_suite/fixture.cc + - test/core/transport/test_suite/inproc_fixture.cc + - test/core/transport/test_suite/no_op.cc + - test/core/transport/test_suite/stress.cc + - test/core/transport/test_suite/test.cc + - test/core/transport/test_suite/test_main.cc + deps: + - gtest + - protobuf + - grpc_test_util + platforms: + - linux + - posix - name: insecure_security_connector_test gtest: true build: test diff --git a/src/core/BUILD b/src/core/BUILD index 70d1333d6df..f4e12856904 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6530,11 +6530,13 @@ grpc_cc_library( deps = [ "chaotic_good_frame", "chaotic_good_frame_header", + "event_engine_tcp_socket_utils", "grpc_promise_endpoint", "if", "try_join", "try_seq", "//:gpr_platform", + "//:grpc_trace", "//:hpack_encoder", "//:promise", ], diff --git a/src/core/ext/transport/chaotic_good/chaotic_good_transport.cc b/src/core/ext/transport/chaotic_good/chaotic_good_transport.cc index 163f994d35f..655f7fd00aa 100644 --- a/src/core/ext/transport/chaotic_good/chaotic_good_transport.cc +++ b/src/core/ext/transport/chaotic_good/chaotic_good_transport.cc @@ -16,4 +16,6 @@ #include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h" +grpc_core::TraceFlag grpc_chaotic_good_trace(false, "chaotic_good"); + namespace grpc_core {} // namespace grpc_core diff --git a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h index 5dd83553638..89f869f3242 100644 --- a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h +++ b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h @@ -22,12 +22,16 @@ #include "src/core/ext/transport/chaotic_good/frame.h" #include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/promise/if.h" #include "src/core/lib/promise/promise.h" #include "src/core/lib/promise/try_join.h" #include "src/core/lib/promise/try_seq.h" #include "src/core/lib/transport/promise_endpoint.h" +extern grpc_core::TraceFlag grpc_chaotic_good_trace; + namespace grpc_core { namespace chaotic_good { @@ -40,6 +44,13 @@ class ChaoticGoodTransport { auto WriteFrame(const FrameInterface& frame) { auto buffers = frame.Serialize(&encoder_); + if (grpc_chaotic_good_trace.enabled()) { + gpr_log(GPR_INFO, "CHAOTIC_GOOD: WriteFrame to:%s %s", + ResolvedAddressToString(control_endpoint_->GetPeerAddress()) + .value_or("<>") + .c_str(), + frame.ToString().c_str()); + } return TryJoin( control_endpoint_->Write(std::move(buffers.control)), data_endpoint_->Write(std::move(buffers.data))); @@ -54,6 +65,15 @@ class ChaoticGoodTransport { auto frame_header = FrameHeader::Parse(reinterpret_cast( GRPC_SLICE_START_PTR(read_buffer.c_slice()))); + if (grpc_chaotic_good_trace.enabled()) { + gpr_log(GPR_INFO, "CHAOTIC_GOOD: ReadHeader from:%s %s", + ResolvedAddressToString(control_endpoint_->GetPeerAddress()) + .value_or("<>") + .c_str(), + frame_header.ok() + ? frame_header->ToString().c_str() + : frame_header.status().ToString().c_str()); + } // Read header and trailers from control endpoint. // Read message padding and message from data endpoint. return If( @@ -80,9 +100,11 @@ class ChaoticGoodTransport { std::move(std::get<1>(*buffers))}); }); }, - [&frame_header]() - -> absl::StatusOr> { - return frame_header.status(); + [&frame_header]() { + return [status = frame_header.status()]() mutable + -> absl::StatusOr> { + return std::move(status); + }; }); }); } diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index b0d5f6b3b22..59e04891925 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -165,11 +165,7 @@ auto ChaoticGoodClientTransport::TransportReadLoop() { } auto ChaoticGoodClientTransport::OnTransportActivityDone() { - return [this](absl::Status status) { - if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) { - this->AbortWithError(); - } - }; + return [this](absl::Status) { AbortWithError(); }; } ChaoticGoodClientTransport::ChaoticGoodClientTransport( @@ -279,7 +275,17 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) { // At this point, the connection is set up. // Start sending data frames. call_handler.SpawnGuarded("outbound_loop", [this, call_handler]() mutable { - return CallOutboundLoop(MakeStream(call_handler), call_handler); + const uint32_t stream_id = MakeStream(call_handler); + return Map(CallOutboundLoop(stream_id, call_handler), + [stream_id, this](absl::Status result) { + if (!result.ok()) { + CancelFrame frame; + frame.stream_id = stream_id; + outgoing_frames_.MakeSender().UnbufferedImmediateSend( + std::move(frame)); + } + return result; + }); }); } diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc index 291031b7c64..041756a6a22 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.cc +++ b/src/core/ext/transport/chaotic_good/server_transport.cc @@ -29,6 +29,7 @@ #include #include +#include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h" #include "src/core/ext/transport/chaotic_good/frame.h" #include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" @@ -107,57 +108,98 @@ auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall( }), [](StatusFlag status) { return StatusCast(status); }); }, - [error = std::move(error)]() { return error; }); + [&error, &frame]() { + gpr_log(GPR_INFO, + "CHAOTIC_GOOD: Cannot pass frame to stream. Error:%s Frame:%s", + error.ToString().c_str(), frame.ToString().c_str()); + return Immediate(std::move(error)); + }); +} + +auto ChaoticGoodServerTransport::SendFragment( + ServerFragmentFrame frame, MpscSender outgoing_frames) { + gpr_log(GPR_INFO, "CHAOTIC_GOOD: SendFragment: frame=%s", + frame.ToString().c_str()); + return Map(outgoing_frames.Send(std::move(frame)), + [](bool success) -> absl::Status { + if (!success) { + // Failed to send outgoing frame. + return absl::UnavailableError("Transport closed."); + } + return absl::OkStatus(); + }); +} + +auto ChaoticGoodServerTransport::SendCallBody( + uint32_t stream_id, MpscSender outgoing_frames, + CallInitiator call_initiator) { + // Continuously send client frame with client to server + // messages. + return ForEach(OutgoingMessages(call_initiator), + [stream_id, outgoing_frames, aligned_bytes = aligned_bytes_]( + MessageHandle message) mutable { + ServerFragmentFrame frame; + // Construct frame header (flags, header_length + // and trailer_length will be added in + // serialization). + const uint32_t message_length = message->payload()->Length(); + const uint32_t padding = + message_length % aligned_bytes == 0 + ? 0 + : aligned_bytes - message_length % aligned_bytes; + GPR_ASSERT((message_length + padding) % aligned_bytes == 0); + frame.message = FragmentMessage(std::move(message), padding, + message_length); + frame.stream_id = stream_id; + return SendFragment(std::move(frame), outgoing_frames); + }); +} + +auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody( + uint32_t stream_id, MpscSender outgoing_frames, + CallInitiator call_initiator) { + return TrySeq( + // Wait for initial metadata then send it out. + call_initiator.PullServerInitialMetadata(), + [stream_id, outgoing_frames, call_initiator, + this](absl::optional md) mutable { + gpr_log(GPR_INFO, "CHAOTIC_GOOD: SendCallInitialMetadataAndBody: md=%s", + md.has_value() ? (*md)->DebugString().c_str() : "null"); + return If( + md.has_value(), + [&md, stream_id, &outgoing_frames, &call_initiator, this]() { + ServerFragmentFrame frame; + frame.headers = std::move(*md); + frame.stream_id = stream_id; + return TrySeq( + SendFragment(std::move(frame), outgoing_frames), + SendCallBody(stream_id, outgoing_frames, call_initiator)); + }, + []() { return absl::OkStatus(); }); + }); } auto ChaoticGoodServerTransport::CallOutboundLoop( uint32_t stream_id, CallInitiator call_initiator) { - auto send_fragment = [stream_id, - outgoing_frames = outgoing_frames_.MakeSender()]( - ServerFragmentFrame frame) mutable { - frame.stream_id = stream_id; - return Map(outgoing_frames.Send(std::move(frame)), - [](bool success) -> absl::Status { - if (!success) { - // Failed to send outgoing frame. - return absl::UnavailableError("Transport closed."); - } - return absl::OkStatus(); - }); - }; - return Seq( - TrySeq( - // Wait for initial metadata then send it out. - call_initiator.PullServerInitialMetadata(), - [send_fragment](ServerMetadataHandle md) mutable { - ServerFragmentFrame frame; - frame.headers = std::move(md); - return send_fragment(std::move(frame)); - }, - // Continuously send client frame with client to server messages. - ForEach(OutgoingMessages(call_initiator), - [send_fragment, aligned_bytes = aligned_bytes_]( - MessageHandle message) mutable { - ServerFragmentFrame frame; - // Construct frame header (flags, header_length and - // trailer_length will be added in serialization). - const uint32_t message_length = - message->payload()->Length(); - const uint32_t padding = - message_length % aligned_bytes == 0 - ? 0 - : aligned_bytes - message_length % aligned_bytes; - GPR_ASSERT((message_length + padding) % aligned_bytes == 0); - frame.message = FragmentMessage(std::move(message), padding, - message_length); - return send_fragment(std::move(frame)); - })), - call_initiator.PullServerTrailingMetadata(), - [send_fragment](ServerMetadataHandle md) mutable { - ServerFragmentFrame frame; - frame.trailers = std::move(md); - return send_fragment(std::move(frame)); - }); + auto outgoing_frames = outgoing_frames_.MakeSender(); + return Seq(Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames, + call_initiator), + [stream_id](absl::Status main_body_result) { + if (grpc_chaotic_good_trace.enabled()) { + gpr_log(GPR_DEBUG, + "CHAOTIC_GOOD: CallOutboundLoop: stream_id=%d " + "main_body_result=%s", + stream_id, main_body_result.ToString().c_str()); + } + return Empty{}; + }), + call_initiator.PullServerTrailingMetadata(), + [stream_id, outgoing_frames](ServerMetadataHandle md) mutable { + ServerFragmentFrame frame; + frame.trailers = std::move(md); + frame.stream_id = stream_id; + return SendFragment(std::move(frame), outgoing_frames); + }); } auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall( @@ -171,13 +213,27 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall( if (status.ok()) { auto create_call_result = acceptor_->CreateCall(*fragment_frame.headers, arena.release()); + if (grpc_chaotic_good_trace.enabled()) { + gpr_log(GPR_INFO, + "CHAOTIC_GOOD: DeserializeAndPushFragmentToNewCall: " + "create_call_result=%s", + create_call_result.ok() + ? "ok" + : create_call_result.status().ToString().c_str()); + } if (create_call_result.ok()) { call_initiator.emplace(std::move(*create_call_result)); - call_initiator->SpawnGuarded( - "server-write", [this, stream_id = frame_header.stream_id, - call_initiator = *call_initiator]() { - return CallOutboundLoop(stream_id, call_initiator); - }); + auto add_result = NewStream(frame_header.stream_id, *call_initiator); + if (add_result.ok()) { + call_initiator->SpawnGuarded( + "server-write", [this, stream_id = frame_header.stream_id, + call_initiator = *call_initiator]() { + return CallOutboundLoop(stream_id, call_initiator); + }); + } else { + call_initiator.reset(); + status = add_result; + } } else { status = create_call_result.status(); } @@ -254,11 +310,15 @@ auto ChaoticGoodServerTransport::TransportReadLoop() { }); } -auto ChaoticGoodServerTransport::OnTransportActivityDone() { - return [this](absl::Status status) { - if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) { - this->AbortWithError(); +auto ChaoticGoodServerTransport::OnTransportActivityDone( + absl::string_view activity) { + return [this, activity](absl::Status status) { + if (grpc_chaotic_good_trace.enabled()) { + gpr_log(GPR_INFO, + "CHAOTIC_GOOD: OnTransportActivityDone: activity=%s status=%s", + std::string(activity).c_str(), status.ToString().c_str()); } + AbortWithError(); }; } @@ -274,7 +334,7 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport( event_engine_(event_engine), writer_{MakeActivity(TransportWriteLoop(), EventEngineWakeupScheduler(event_engine), - OnTransportActivityDone())}, + OnTransportActivityDone("writer"))}, reader_{nullptr} {} void ChaoticGoodServerTransport::SetAcceptor(Acceptor* acceptor) { @@ -283,7 +343,7 @@ void ChaoticGoodServerTransport::SetAcceptor(Acceptor* acceptor) { acceptor_ = acceptor; reader_ = MakeActivity(TransportReadLoop(), EventEngineWakeupScheduler(event_engine_), - OnTransportActivityDone()); + OnTransportActivityDone("reader")); } ChaoticGoodServerTransport::~ChaoticGoodServerTransport() { @@ -330,5 +390,19 @@ absl::optional ChaoticGoodServerTransport::ExtractStream( return std::move(r); } +absl::Status ChaoticGoodServerTransport::NewStream( + uint32_t stream_id, CallInitiator call_initiator) { + MutexLock lock(&mu_); + auto it = stream_map_.find(stream_id); + if (it != stream_map_.end()) { + return absl::InternalError("Stream already exists"); + } + if (stream_id <= last_seen_new_stream_id_) { + return absl::InternalError("Stream id is not increasing"); + } + stream_map_.emplace(stream_id, std::move(call_initiator)); + return absl::OkStatus(); +} + } // namespace chaotic_good } // namespace grpc_core diff --git a/src/core/ext/transport/chaotic_good/server_transport.h b/src/core/ext/transport/chaotic_good/server_transport.h index 9ce92928385..5cd52aa61f9 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.h +++ b/src/core/ext/transport/chaotic_good/server_transport.h @@ -107,8 +107,15 @@ class ChaoticGoodServerTransport final : public Transport, absl::Status NewStream(uint32_t stream_id, CallInitiator call_initiator); absl::optional LookupStream(uint32_t stream_id); absl::optional ExtractStream(uint32_t stream_id); + auto SendCallInitialMetadataAndBody(uint32_t stream_id, + MpscSender outgoing_frames, + CallInitiator call_initiator); + auto SendCallBody(uint32_t stream_id, MpscSender outgoing_frames, + CallInitiator call_initiator); + static auto SendFragment(ServerFragmentFrame frame, + MpscSender outgoing_frames); auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator); - auto OnTransportActivityDone(); + auto OnTransportActivityDone(absl::string_view activity); auto TransportReadLoop(); auto TransportWriteLoop(); // Read different parts of the server frame from control/data endpoints @@ -133,6 +140,7 @@ class ChaoticGoodServerTransport final : public Transport, Mutex mu_; // Map of stream incoming server frames, key is stream_id. StreamMap stream_map_ ABSL_GUARDED_BY(mu_); + uint32_t last_seen_new_stream_id_ = 0; grpc_event_engine::experimental::MemoryAllocator allocator_; std::shared_ptr event_engine_; ActivityPtr writer_; diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 24fdd41d387..936068b0aa3 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -112,17 +112,15 @@ class InprocClientTransport final : public Transport, public ClientTransport { void StartCall(CallHandler call_handler) override { call_handler.SpawnGuarded( "pull_initial_metadata", - TrySeq( - call_handler.PullClientInitialMetadata(), - [server_transport = server_transport_, - call_handler](ClientMetadataHandle md) { - auto call_initiator = server_transport->AcceptCall(*md); - if (!call_initiator.ok()) return call_initiator.status(); - ForwardCall(call_handler, std::move(*call_initiator), - std::move(md)); - return absl::OkStatus(); - }, - ImmediateOkStatus())); + TrySeq(call_handler.PullClientInitialMetadata(), + [server_transport = server_transport_, + call_handler](ClientMetadataHandle md) { + auto call_initiator = server_transport->AcceptCall(*md); + if (!call_initiator.ok()) return call_initiator.status(); + ForwardCall(call_handler, std::move(*call_initiator), + std::move(md)); + return absl::OkStatus(); + })); } void Orphan() override { delete this; } @@ -172,8 +170,9 @@ RefCountedPtr MakeLameChannel(absl::string_view why, RefCountedPtr MakeInprocChannel(Server* server, ChannelArgs client_channel_args) { - auto client_transport = MakeOrphanable(); - auto server_transport = client_transport->GetServerTransport(); + auto transports = MakeInProcessTransportPair(); + auto client_transport = std::move(transports.first); + auto server_transport = std::move(transports.second); auto error = server->SetupTransport(server_transport.get(), nullptr, server->channel_args() @@ -195,6 +194,14 @@ RefCountedPtr MakeInprocChannel(Server* server, } } // namespace +std::pair, OrphanablePtr> +MakeInProcessTransportPair() { + auto client_transport = MakeOrphanable(); + auto server_transport = client_transport->GetServerTransport(); + return std::make_pair(std::move(client_transport), + std::move(server_transport)); +} + } // namespace grpc_core grpc_channel* grpc_inproc_channel_create(grpc_server* server, diff --git a/src/core/ext/transport/inproc/inproc_transport.h b/src/core/ext/transport/inproc/inproc_transport.h index db3e38c7936..fe41ed023c6 100644 --- a/src/core/ext/transport/inproc/inproc_transport.h +++ b/src/core/ext/transport/inproc/inproc_transport.h @@ -20,6 +20,7 @@ #include #include "src/core/lib/debug/trace.h" +#include "src/core/lib/transport/transport.h" grpc_channel* grpc_inproc_channel_create(grpc_server* server, const grpc_channel_args* args, @@ -27,4 +28,11 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server, extern grpc_core::TraceFlag grpc_inproc_trace; +namespace grpc_core { + +std::pair, OrphanablePtr> +MakeInProcessTransportPair(); + +} + #endif // GRPC_SRC_CORE_EXT_TRANSPORT_INPROC_INPROC_TRANSPORT_H diff --git a/src/core/lib/gprpp/debug_location.h b/src/core/lib/gprpp/debug_location.h index c6c9b682869..73b9c04a5d6 100644 --- a/src/core/lib/gprpp/debug_location.h +++ b/src/core/lib/gprpp/debug_location.h @@ -65,6 +65,7 @@ class DebugLocation { DebugLocation(const char* file = GRPC_DEFAULT_FILE, int line = GRPC_DEFAULT_LINE) : location_(file, line) {} + explicit DebugLocation(SourceLocation location) : location_(location) {} const char* file() const { return location_.file(); } int line() const { return location_.line(); } @@ -75,6 +76,7 @@ class DebugLocation { class DebugLocation { public: DebugLocation() {} + explicit DebugLocation(SourceLocation) {} DebugLocation(const char* /* file */, int /* line */) {} const char* file() const { return nullptr; } int line() const { return -1; } diff --git a/src/core/lib/promise/for_each.h b/src/core/lib/promise/for_each.h index 1dd6e271ccc..6b1f26d4feb 100644 --- a/src/core/lib/promise/for_each.h +++ b/src/core/lib/promise/for_each.h @@ -47,17 +47,14 @@ struct Done; template <> struct Done { - static absl::Status Make() { return absl::OkStatus(); } + static absl::Status Make(bool cancelled) { + return cancelled ? absl::CancelledError() : absl::OkStatus(); + } }; template <> struct Done { - static StatusFlag Make() { return StatusFlag(true); } -}; - -template <> -struct Done { - static Success Make() { return Success{}; } + static StatusFlag Make(bool cancelled) { return StatusFlag(!cancelled); } }; template @@ -139,7 +136,7 @@ class ForEach { reading_next_ = false; return PollAction(); } else { - return Done::Make(); + return Done::Make(p->cancelled()); } } return Pending(); diff --git a/src/core/lib/promise/inter_activity_pipe.h b/src/core/lib/promise/inter_activity_pipe.h index d9a2d899637..dad7bbf2af6 100644 --- a/src/core/lib/promise/inter_activity_pipe.h +++ b/src/core/lib/promise/inter_activity_pipe.h @@ -35,6 +35,24 @@ namespace grpc_core { template class InterActivityPipe { + public: + class NextResult { + public: + template + explicit NextResult(Args&&... args) : value_(std::forward(args)...) {} + using value_type = T; + void reset() { value_.reset(); } + bool cancelled() const { return false; } + bool has_value() const { return value_.has_value(); } + const T& value() const { return value_.value(); } + T& value() { return value_.value(); } + const T& operator*() const { return *value_; } + T& operator*() { return *value_; } + + private: + absl::optional value_; + }; + private: class Center : public RefCounted { public: @@ -55,7 +73,7 @@ class InterActivityPipe { return true; } - Poll> Next() { + Poll Next() { ReleasableMutexLock lock(&mu_); if (count_ == 0) { if (closed_) return absl::nullopt; diff --git a/src/core/lib/promise/observable.h b/src/core/lib/promise/observable.h index f15700f6272..335fc393754 100644 --- a/src/core/lib/promise/observable.h +++ b/src/core/lib/promise/observable.h @@ -74,7 +74,7 @@ class Observable { GRPC_MUST_USE_RESULT Waker Add(Observer* observer) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { observers_.insert(observer); - return Activity::current()->MakeNonOwningWaker(); + return GetContext()->MakeNonOwningWaker(); } private: diff --git a/src/core/lib/promise/status_flag.h b/src/core/lib/promise/status_flag.h index d06f5534e11..700ad9c856b 100644 --- a/src/core/lib/promise/status_flag.h +++ b/src/core/lib/promise/status_flag.h @@ -77,6 +77,7 @@ class StatusFlag { bool ok() const { return value_; } bool operator==(StatusFlag other) const { return value_ == other.value_; } + std::string ToString() const { return value_ ? "ok" : "failed"; } template friend void AbslStringify(Sink& sink, StatusFlag flag) { diff --git a/src/core/lib/transport/promise_endpoint.cc b/src/core/lib/transport/promise_endpoint.cc index 6fa5e4633be..1fd08b8917e 100644 --- a/src/core/lib/transport/promise_endpoint.cc +++ b/src/core/lib/transport/promise_endpoint.cc @@ -59,9 +59,6 @@ PromiseEndpoint::GetLocalAddress() const { void PromiseEndpoint::ReadState::Complete(absl::Status status, size_t num_bytes_requested) { - gpr_log(GPR_ERROR, "PromiseEndpoint::ReadState::Complete: status:%s", - status.ToString().c_str()); - if (!status.ok()) { // Invalidates all previous reads. pending_buffer.Clear(); diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 1619ad83b0e..f4c66b3383c 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -229,9 +229,9 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator, std::move(client_initial_metadata)); }); // Read messages from handler into initiator. - call_handler.SpawnGuarded( - "read_messages", [call_handler, call_initiator]() mutable { - return ForEach(OutgoingMessages(call_handler), + call_handler.SpawnGuarded("read_messages", [call_handler, + call_initiator]() mutable { + return Seq(ForEach(OutgoingMessages(call_handler), [call_initiator](MessageHandle msg) mutable { // Need to spawn a job into the initiator's activity to // push the message in. @@ -241,32 +241,47 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator, return call_initiator.CancelIfFails( call_initiator.PushMessage(std::move(msg))); }); - }); - }); + }), + [call_initiator](StatusFlag result) mutable { + call_initiator.SpawnInfallible( + "finish-downstream", [call_initiator, result]() mutable { + if (result.ok()) { + call_initiator.FinishSends(); + } else { + call_initiator.Cancel(); + } + return Empty{}; + }); + return result; + }); + }); call_initiator.SpawnInfallible("read_the_things", [call_initiator, call_handler]() mutable { return Seq( call_initiator.CancelIfFails(TrySeq( call_initiator.PullServerInitialMetadata(), - [call_handler](ServerMetadataHandle md) mutable { + [call_handler, + call_initiator](absl::optional md) mutable { + const bool has_md = md.has_value(); call_handler.SpawnGuarded( "recv_initial_metadata", [md = std::move(md), call_handler]() mutable { return call_handler.PushServerInitialMetadata( std::move(md)); }); - return Success{}; - }, - ForEach(OutgoingMessages(call_initiator), - [call_handler](MessageHandle msg) mutable { - return call_handler.SpawnWaitable( - "recv_message", - [msg = std::move(msg), call_handler]() mutable { - return call_handler.CancelIfFails( - call_handler.PushMessage(std::move(msg))); - }); - }), - ImmediateOkStatus())), + return If( + has_md, + ForEach(OutgoingMessages(call_initiator), + [call_handler](MessageHandle msg) mutable { + return call_handler.SpawnWaitable( + "recv_message", + [msg = std::move(msg), call_handler]() mutable { + return call_handler.CancelIfFails( + call_handler.PushMessage(std::move(msg))); + }); + }), + []() -> StatusFlag { return Success{}; }); + })), call_initiator.PullServerTrailingMetadata(), [call_handler](ServerMetadataHandle md) mutable { call_handler.SpawnGuarded( diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 0815e2f72be..6b222eff3bf 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -54,10 +54,11 @@ #include "src/core/lib/promise/arena_promise.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/detail/status.h" +#include "src/core/lib/promise/if.h" #include "src/core/lib/promise/latch.h" #include "src/core/lib/promise/party.h" #include "src/core/lib/promise/pipe.h" -#include "src/core/lib/promise/race.h" +#include "src/core/lib/promise/prioritized_race.h" #include "src/core/lib/promise/status_flag.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice_buffer.h" @@ -257,6 +258,10 @@ class CallSpineInterface { "SpawnGuarded promise must return a status-like object"); party().Spawn(name, std::move(promise_factory), [this](ResultType r) { if (!IsStatusOk(r)) { + if (grpc_trace_promise_primitives.enabled()) { + gpr_log(GPR_DEBUG, "SpawnGuarded sees failure: %s", + r.ToString().c_str()); + } std::ignore = Cancel(StatusCast(std::move(r))); } }); @@ -357,22 +362,26 @@ class CallInitiator { auto PullServerInitialMetadata() { GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); return Map(spine_->server_initial_metadata().receiver.Next(), - [](NextResult md) - -> ValueOrFailure { - if (!md.has_value()) return Failure{}; - return std::move(*md); + [](NextResult md) + -> ValueOrFailure> { + if (!md.has_value()) { + if (md.cancelled()) return Failure{}; + return absl::optional(); + } + return absl::optional(std::move(*md)); }); } auto PullServerTrailingMetadata() { GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Race(spine_->WaitForCancel(), - Map(spine_->server_trailing_metadata().receiver.Next(), - [spine = spine_](NextResult md) - -> ServerMetadataHandle { - GPR_ASSERT(md.has_value()); - return std::move(*md); - })); + return PrioritizedRace( + Map(spine_->server_trailing_metadata().receiver.Next(), + [spine = spine_]( + NextResult md) -> ServerMetadataHandle { + GPR_ASSERT(md.has_value()); + return std::move(*md); + }), + spine_->WaitForCancel()); } auto PullMessage() { @@ -439,15 +448,26 @@ class CallHandler { }); } - auto PushServerInitialMetadata(ServerMetadataHandle md) { + auto PushServerInitialMetadata(absl::optional md) { GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Map(spine_->server_initial_metadata().sender.Push(std::move(md)), - [](bool ok) { return StatusFlag(ok); }); + return If( + md.has_value(), + [&md, this]() { + return Map( + spine_->server_initial_metadata().sender.Push(std::move(*md)), + [](bool ok) { return StatusFlag(ok); }); + }, + [this]() { + spine_->server_initial_metadata().sender.Close(); + return []() -> StatusFlag { return Success{}; }; + }); } auto PushServerTrailingMetadata(ServerMetadataHandle md) { GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + spine_->server_initial_metadata().sender.Close(); spine_->server_to_client_messages().sender.Close(); + spine_->client_to_server_messages().receiver.CloseWithError(); spine_->CallOnDone(); return Map(spine_->server_trailing_metadata().sender.Push(std::move(md)), [](bool ok) { return StatusFlag(ok); }); diff --git a/test/core/promise/inter_activity_pipe_test.cc b/test/core/promise/inter_activity_pipe_test.cc index dffc796f268..a81ac5c1822 100644 --- a/test/core/promise/inter_activity_pipe_test.cc +++ b/test/core/promise/inter_activity_pipe_test.cc @@ -38,12 +38,12 @@ TEST(InterActivityPipe, CanSendAndReceive) { return absl::OkStatus(); })); EXPECT_FALSE(done); - auto b = - TestActivity(Seq(pipe.receiver.Next(), [&done](absl::optional n) { - EXPECT_EQ(n, 3); - done = true; - return absl::OkStatus(); - })); + auto b = TestActivity(Seq(pipe.receiver.Next(), + [&done](InterActivityPipe::NextResult n) { + EXPECT_EQ(n.value(), 3); + done = true; + return absl::OkStatus(); + })); EXPECT_TRUE(done); } @@ -63,12 +63,12 @@ TEST(InterActivityPipe, CanSendTwiceAndReceive) { EXPECT_FALSE(done); auto b = TestActivity(Seq( pipe.receiver.Next(), - [&pipe](absl::optional n) { - EXPECT_EQ(n, 3); + [&pipe](InterActivityPipe::NextResult n) { + EXPECT_EQ(n.value(), 3); return pipe.receiver.Next(); }, - [&done](absl::optional n) { - EXPECT_EQ(n, 4); + [&done](InterActivityPipe::NextResult n) { + EXPECT_EQ(n.value(), 4); done = true; return absl::OkStatus(); })); @@ -78,12 +78,12 @@ TEST(InterActivityPipe, CanSendTwiceAndReceive) { TEST(InterActivityPipe, CanReceiveAndSend) { InterActivityPipe pipe; bool done = false; - auto b = - TestActivity(Seq(pipe.receiver.Next(), [&done](absl::optional n) { - EXPECT_EQ(n, 3); - done = true; - return absl::OkStatus(); - })); + auto b = TestActivity(Seq(pipe.receiver.Next(), + [&done](InterActivityPipe::NextResult n) { + EXPECT_EQ(n.value(), 3); + done = true; + return absl::OkStatus(); + })); EXPECT_FALSE(done); auto a = TestActivity(Seq(pipe.sender.Push(3), [](bool b) { EXPECT_TRUE(b); @@ -95,12 +95,12 @@ TEST(InterActivityPipe, CanReceiveAndSend) { TEST(InterActivityPipe, CanClose) { InterActivityPipe pipe; bool done = false; - auto b = - TestActivity(Seq(pipe.receiver.Next(), [&done](absl::optional n) { - EXPECT_EQ(n, absl::nullopt); - done = true; - return absl::OkStatus(); - })); + auto b = TestActivity(Seq(pipe.receiver.Next(), + [&done](InterActivityPipe::NextResult n) { + EXPECT_FALSE(n.has_value()); + done = true; + return absl::OkStatus(); + })); EXPECT_FALSE(done); // Drop the sender { auto x = std::move(pipe.sender); } diff --git a/test/core/transport/chaotic_good/client_transport_error_test.cc b/test/core/transport/chaotic_good/client_transport_error_test.cc index 295e060b809..9beb4dfa010 100644 --- a/test/core/transport/chaotic_good/client_transport_error_test.cc +++ b/test/core/transport/chaotic_good/client_transport_error_test.cc @@ -186,7 +186,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) { "test-read", [&on_done, initiator = call.initiator]() mutable { return Seq( initiator.PullServerInitialMetadata(), - [](ValueOrFailure md) { + [](ValueOrFailure> md) { EXPECT_FALSE(md.ok()); return Empty{}; }, @@ -231,7 +231,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) { "test-read", [&on_done, initiator = call.initiator]() mutable { return Seq( initiator.PullServerInitialMetadata(), - [](ValueOrFailure md) { + [](ValueOrFailure> md) { EXPECT_FALSE(md.ok()); return Empty{}; }, @@ -294,7 +294,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) { "test-read-1", [&on_done1, initiator = call1.initiator]() mutable { return Seq( initiator.PullServerInitialMetadata(), - [](ValueOrFailure md) { + [](ValueOrFailure> md) { EXPECT_FALSE(md.ok()); return Empty{}; }, @@ -310,7 +310,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) { "test-read-2", [&on_done2, initiator = call2.initiator]() mutable { return Seq( initiator.PullServerInitialMetadata(), - [](ValueOrFailure md) { + [](ValueOrFailure> md) { EXPECT_FALSE(md.ok()); return Empty{}; }, @@ -365,7 +365,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) { "test-read", [&on_done1, initiator = call1.initiator]() mutable { return Seq( initiator.PullServerInitialMetadata(), - [](ValueOrFailure md) { + [](ValueOrFailure> md) { EXPECT_FALSE(md.ok()); return Empty{}; }, @@ -381,7 +381,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) { "test-read", [&on_done2, initiator = call2.initiator]() mutable { return Seq( initiator.PullServerInitialMetadata(), - [](ValueOrFailure md) { + [](ValueOrFailure> md) { EXPECT_FALSE(md.ok()); return Empty{}; }, diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc index 551c6fd5762..880a92c2f45 100644 --- a/test/core/transport/chaotic_good/client_transport_test.cc +++ b/test/core/transport/chaotic_good/client_transport_test.cc @@ -135,11 +135,14 @@ TEST_F(TransportTest, AddOneStream) { "test-read", [&on_done, initiator = call.initiator]() mutable { return Seq( initiator.PullServerInitialMetadata(), - [](ValueOrFailure md) { + [](ValueOrFailure> md) { EXPECT_TRUE(md.ok()); - EXPECT_EQ( - md.value()->get_pointer(HttpPathMetadata())->as_string_view(), - "/demo.Service/Step"); + EXPECT_TRUE(md.value().has_value()); + EXPECT_EQ(md.value() + .value() + ->get_pointer(HttpPathMetadata()) + ->as_string_view(), + "/demo.Service/Step"); return Empty{}; }, initiator.PullMessage(), @@ -219,11 +222,14 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) { "test-read", [&on_done, initiator = call.initiator]() mutable { return Seq( initiator.PullServerInitialMetadata(), - [](ValueOrFailure md) { + [](ValueOrFailure> md) { EXPECT_TRUE(md.ok()); - EXPECT_EQ( - md.value()->get_pointer(HttpPathMetadata())->as_string_view(), - "/demo.Service/Step"); + EXPECT_TRUE(md.value().has_value()); + EXPECT_EQ(md.value() + .value() + ->get_pointer(HttpPathMetadata()) + ->as_string_view(), + "/demo.Service/Step"); return Empty{}; }, initiator.PullMessage(), diff --git a/test/core/transport/test_suite/BUILD b/test/core/transport/test_suite/BUILD new file mode 100644 index 00000000000..4bc8d7fcaa2 --- /dev/null +++ b/test/core/transport/test_suite/BUILD @@ -0,0 +1,165 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load( + "//bazel:grpc_build_system.bzl", + "grpc_cc_library", + "grpc_package", + "grpc_proto_library", +) +load("grpc_transport_test.bzl", "grpc_transport_test") + +grpc_package(name = "test/core/transport/test_suite") + +grpc_cc_library( + name = "fixture", + testonly = 1, + srcs = ["fixture.cc"], + hdrs = ["fixture.h"], + deps = [ + "//:grpc", + "//test/core/event_engine/fuzzing_event_engine", + ], +) + +grpc_cc_library( + name = "inproc_fixture", + testonly = 1, + srcs = ["inproc_fixture.cc"], + deps = [ + "fixture", + "//src/core:grpc_transport_inproc", + ], + alwayslink = 1, +) + +grpc_cc_library( + name = "chaotic_good_fixture", + testonly = 1, + srcs = ["chaotic_good_fixture.cc"], + external_deps = ["gtest"], + deps = [ + "fixture", + "//src/core:chaotic_good_client_transport", + "//src/core:chaotic_good_server_transport", + "//src/core:event_engine_memory_allocator_factory", + "//src/core:event_engine_tcp_socket_utils", + "//src/core:grpc_promise_endpoint", + "//src/core:resource_quota", + ], + alwayslink = 1, +) + +grpc_cc_library( + name = "test", + testonly = 1, + srcs = ["test.cc"], + hdrs = ["test.h"], + external_deps = [ + "absl/functional:any_invocable", + "absl/random", + "absl/random:bit_gen_ref", + "absl/strings", + "gtest", + ], + deps = [ + "fixture", + "//:iomgr_timer", + "//:promise", + "//src/core:cancel_callback", + "//src/core:resource_quota", + "//src/core:time", + "//test/core/event_engine/fuzzing_event_engine", + "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto", + ], +) + +grpc_cc_library( + name = "test_main", + testonly = 1, + srcs = ["test_main.cc"], + external_deps = ["absl/random"], + deps = [ + "fixture", + "test", + "//:grpc_trace", + "//test/core/util:grpc_test_util", + ], +) + +grpc_cc_library( + name = "call_content", + testonly = 1, + srcs = ["call_content.cc"], + external_deps = ["gtest"], + deps = ["test"], + alwayslink = 1, +) + +grpc_cc_library( + name = "call_shapes", + testonly = 1, + srcs = ["call_shapes.cc"], + deps = ["test"], + alwayslink = 1, +) + +grpc_cc_library( + name = "no_op", + testonly = 1, + srcs = ["no_op.cc"], + deps = ["test"], + alwayslink = 1, +) + +grpc_cc_library( + name = "stress", + testonly = 1, + srcs = ["stress.cc"], + external_deps = ["absl/random"], + deps = ["test"], + alwayslink = 1, +) + +grpc_proto_library( + name = "fuzzer_proto", + srcs = ["fuzzer.proto"], + has_services = False, + deps = [ + "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto", + "//test/core/util:fuzz_config_vars_proto", + ], +) + +grpc_transport_test( + name = "inproc", + deps = [ + ":call_content", + ":call_shapes", + ":inproc_fixture", + ":no_op", + ":stress", + ], +) + +grpc_transport_test( + name = "chaotic_good", + deps = [ + ":call_content", + ":call_shapes", + ":chaotic_good_fixture", + ":no_op", + ":stress", + ], +) diff --git a/test/core/transport/test_suite/call_content.cc b/test/core/transport/test_suite/call_content.cc new file mode 100644 index 00000000000..c565bb76696 --- /dev/null +++ b/test/core/transport/test_suite/call_content.cc @@ -0,0 +1,151 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "gmock/gmock.h" + +#include "test/core/transport/test_suite/test.h" + +using testing::UnorderedElementsAreArray; + +namespace grpc_core { + +namespace { +class LoweringEncoder { + public: + std::vector> Take() { + return std::move(metadata_); + } + + void Encode(const Slice& key, const Slice& value) { + metadata_.emplace_back(key.as_string_view(), value.as_string_view()); + } + + template + void Encode(Which, const typename Which::ValueType& value) { + metadata_.emplace_back(Which::key(), Which::Encode(value).as_string_view()); + } + + private: + std::vector> metadata_; +}; + +std::vector> LowerMetadata( + const grpc_metadata_batch& metadata) { + LoweringEncoder encoder; + metadata.Encode(&encoder); + return encoder.Take(); +} + +void FillMetadata(const std::vector>& md, + grpc_metadata_batch& out) { + for (const auto& p : md) { + out.Append(p.first, Slice::FromCopiedString(p.second), + [&](absl::string_view error, const Slice& value) { + Crash(absl::StrCat( + "Failed to parse metadata for '", p.first, "': ", error, + " value=", absl::CEscape(value.as_string_view()))); + }); + } +} + +} // namespace + +TRANSPORT_TEST(UnaryWithSomeContent) { + SetServerAcceptor(); + auto initiator = CreateCall(); + const auto client_initial_metadata = RandomMetadata(); + const auto server_initial_metadata = RandomMetadata(); + const auto server_trailing_metadata = RandomMetadata(); + const auto client_payload = RandomMessage(); + const auto server_payload = RandomMessage(); + SpawnTestSeq( + initiator, "initiator", + [&]() { + auto md = Arena::MakePooled(GetContext()); + FillMetadata(client_initial_metadata, *md); + return initiator.PushClientInitialMetadata(std::move(md)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return initiator.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString(client_payload)), 0)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + initiator.FinishSends(); + return initiator.PullServerInitialMetadata(); + }, + [&](ValueOrFailure> md) { + EXPECT_TRUE(md.ok()); + EXPECT_TRUE(md.value().has_value()); + EXPECT_THAT(LowerMetadata(***md), + UnorderedElementsAreArray(server_initial_metadata)); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), server_payload); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + return initiator.PullServerTrailingMetadata(); + }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_THAT(LowerMetadata(**md), + UnorderedElementsAreArray(server_trailing_metadata)); + return Empty{}; + }); + auto handler = TickUntilServerCall(); + SpawnTestSeq( + handler, "handler", [&] { return handler.PullClientInitialMetadata(); }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_THAT(LowerMetadata(**md), + UnorderedElementsAreArray(client_initial_metadata)); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), client_payload); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + auto md = Arena::MakePooled(GetContext()); + FillMetadata(server_initial_metadata, *md); + return handler.PushServerInitialMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString(server_payload)), 0)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + auto md = Arena::MakePooled(GetContext()); + FillMetadata(server_trailing_metadata, *md); + return handler.PushServerTrailingMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return Empty{}; + }); + WaitForAllPendingWork(); +} + +} // namespace grpc_core diff --git a/test/core/transport/test_suite/call_shapes.cc b/test/core/transport/test_suite/call_shapes.cc new file mode 100644 index 00000000000..392e8037c49 --- /dev/null +++ b/test/core/transport/test_suite/call_shapes.cc @@ -0,0 +1,681 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/transport/test_suite/test.h" + +namespace grpc_core { + +TRANSPORT_TEST(MetadataOnlyRequest) { + SetServerAcceptor(); + auto initiator = CreateCall(); + SpawnTestSeq( + initiator, "initiator", + [&]() { + auto md = Arena::MakePooled(GetContext()); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + return initiator.PushClientInitialMetadata(std::move(md)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + initiator.FinishSends(); + return initiator.PullServerInitialMetadata(); + }, + [&](ValueOrFailure> md) { + EXPECT_TRUE(md.ok()); + EXPECT_TRUE(md.value().has_value()); + EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()), + ContentTypeMetadata::kApplicationGrpc); + return initiator.PullServerTrailingMetadata(); + }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()), + GRPC_STATUS_UNIMPLEMENTED); + return Empty{}; + }); + auto handler = TickUntilServerCall(); + SpawnTestSeq( + handler, "handler", [&] { return handler.PullClientInitialMetadata(); }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(), + "/foo/bar"); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_FALSE(msg.has_value()); + auto md = Arena::MakePooled(GetContext()); + md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); + return handler.PushServerInitialMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + auto md = Arena::MakePooled(GetContext()); + md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); + return handler.PushServerTrailingMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return Empty{}; + }); + WaitForAllPendingWork(); +} + +TRANSPORT_TEST(MetadataOnlyRequestServerAbortsAfterInitialMetadata) { + SetServerAcceptor(); + auto initiator = CreateCall(); + SpawnTestSeq( + initiator, "initiator", + [&]() { + auto md = Arena::MakePooled(GetContext()); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + return initiator.PushClientInitialMetadata(std::move(md)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + // We don't close the sending stream here. + return initiator.PullServerInitialMetadata(); + }, + [&](ValueOrFailure> md) { + EXPECT_TRUE(md.ok()); + EXPECT_TRUE(md.value().has_value()); + EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()), + ContentTypeMetadata::kApplicationGrpc); + return initiator.PullServerTrailingMetadata(); + }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()), + GRPC_STATUS_UNIMPLEMENTED); + return Empty{}; + }); + auto handler = TickUntilServerCall(); + SpawnTestSeq( + handler, "handler", [&] { return handler.PullClientInitialMetadata(); }, + [&](ValueOrFailure got_md) { + EXPECT_TRUE(got_md.ok()); + EXPECT_EQ( + got_md.value()->get_pointer(HttpPathMetadata())->as_string_view(), + "/foo/bar"); + // Don't wait for end of stream for client->server messages, just + // publish initial then trailing metadata. + auto md = Arena::MakePooled(GetContext()); + md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); + return handler.PushServerInitialMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + auto md = Arena::MakePooled(GetContext()); + md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); + return handler.PushServerTrailingMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return Empty{}; + }); + WaitForAllPendingWork(); +} + +TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) { + SetServerAcceptor(); + auto initiator = CreateCall(); + SpawnTestSeq( + initiator, "initiator", + [&]() { + auto md = Arena::MakePooled(GetContext()); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + return initiator.PushClientInitialMetadata(std::move(md)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + // We don't close the sending stream here. + return initiator.PullServerInitialMetadata(); + }, + [&](ValueOrFailure> md) { + EXPECT_TRUE(md.ok()); + EXPECT_FALSE(md.value().has_value()); + return initiator.PullServerTrailingMetadata(); + }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()), + GRPC_STATUS_UNIMPLEMENTED); + return Empty{}; + }); + auto handler = TickUntilServerCall(); + SpawnTestSeq( + handler, "handler", [&] { return handler.PullClientInitialMetadata(); }, + [&](ValueOrFailure got_md) { + EXPECT_TRUE(got_md.ok()); + EXPECT_EQ( + got_md.value()->get_pointer(HttpPathMetadata())->as_string_view(), + "/foo/bar"); + // Don't wait for end of stream for client->server messages, just + // and don't send initial metadata - just trailing metadata. + auto md = Arena::MakePooled(GetContext()); + md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); + return handler.PushServerTrailingMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return Empty{}; + }); + WaitForAllPendingWork(); +} + +TRANSPORT_TEST(CanCreateCallThenAbandonIt) { + SetServerAcceptor(); + auto initiator = CreateCall(); + SpawnTestSeq( + initiator, "start-call", + [&]() { + auto md = Arena::MakePooled(GetContext()); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + return initiator.PushClientInitialMetadata(std::move(md)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return Empty{}; + }); + auto handler = TickUntilServerCall(); + SpawnTestSeq(initiator, "end-call", [&]() { + initiator.Cancel(); + return Empty{}; + }); + WaitForAllPendingWork(); +} + +TRANSPORT_TEST(UnaryRequest) { + SetServerAcceptor(); + auto initiator = CreateCall(); + SpawnTestSeq( + initiator, "initiator", + [&]() { + auto md = Arena::MakePooled(GetContext()); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + return initiator.PushClientInitialMetadata(std::move(md)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return initiator.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("hello world")), 0)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + initiator.FinishSends(); + return initiator.PullServerInitialMetadata(); + }, + [&](ValueOrFailure> md) { + EXPECT_TRUE(md.ok()); + EXPECT_TRUE(md.value().has_value()); + EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()), + ContentTypeMetadata::kApplicationGrpc); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + "why hello neighbor"); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + return initiator.PullServerTrailingMetadata(); + }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()), + GRPC_STATUS_UNIMPLEMENTED); + return Empty{}; + }); + auto handler = TickUntilServerCall(); + SpawnTestSeq( + handler, "handler", [&] { return handler.PullClientInitialMetadata(); }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(), + "/foo/bar"); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world"); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + auto md = Arena::MakePooled(GetContext()); + md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); + return handler.PushServerInitialMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + auto md = Arena::MakePooled(GetContext()); + md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); + return handler.PushServerTrailingMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return Empty{}; + }); + WaitForAllPendingWork(); +} + +TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) { + SetServerAcceptor(); + auto initiator = CreateCall(); + SpawnTestSeq( + initiator, "initiator", + [&]() { + auto md = Arena::MakePooled(GetContext()); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + return initiator.PushClientInitialMetadata(std::move(md)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return initiator.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("hello world")), 0)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + initiator.FinishSends(); + return initiator.PullServerInitialMetadata(); + }, + [&](ValueOrFailure> md) { + EXPECT_TRUE(md.ok()); + EXPECT_TRUE(md.value().has_value()); + EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()), + ContentTypeMetadata::kApplicationGrpc); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + "why hello neighbor"); + return initiator.PullServerTrailingMetadata(); + }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()), + GRPC_STATUS_UNIMPLEMENTED); + return Empty{}; + }); + auto handler = TickUntilServerCall(); + SpawnTestSeq( + handler, "handler", [&] { return handler.PullClientInitialMetadata(); }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(), + "/foo/bar"); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world"); + auto md = Arena::MakePooled(GetContext()); + md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); + return handler.PushServerInitialMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + auto md = Arena::MakePooled(GetContext()); + md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); + return handler.PushServerTrailingMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return Empty{}; + }); + WaitForAllPendingWork(); +} + +TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) { + SetServerAcceptor(); + auto initiator = CreateCall(); + SpawnTestSeq( + initiator, "initiator", + [&]() { + auto md = Arena::MakePooled(GetContext()); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + return initiator.PushClientInitialMetadata(std::move(md)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return initiator.PullServerInitialMetadata(); + }, + [&](ValueOrFailure> md) { + EXPECT_TRUE(md.ok()); + EXPECT_TRUE(md.value().has_value()); + EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()), + ContentTypeMetadata::kApplicationGrpc); + return initiator.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("hello world")), 0)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + initiator.FinishSends(); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + "why hello neighbor"); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + return initiator.PullServerTrailingMetadata(); + }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()), + GRPC_STATUS_UNIMPLEMENTED); + return Empty{}; + }); + auto handler = TickUntilServerCall(); + SpawnTestSeq( + handler, "handler", [&] { return handler.PullClientInitialMetadata(); }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(), + "/foo/bar"); + auto md_out = Arena::MakePooled(GetContext()); + md_out->Set(ContentTypeMetadata(), + ContentTypeMetadata::kApplicationGrpc); + return handler.PushServerInitialMetadata(std::move(md_out)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world"); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + return handler.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + auto md = Arena::MakePooled(GetContext()); + md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); + return handler.PushServerTrailingMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return Empty{}; + }); + WaitForAllPendingWork(); +} + +TRANSPORT_TEST(ClientStreamingRequest) { + SetServerAcceptor(); + auto initiator = CreateCall(); + SpawnTestSeq( + initiator, "initiator", + [&]() { + auto md = Arena::MakePooled(GetContext()); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + return initiator.PushClientInitialMetadata(std::move(md)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return initiator.PullServerInitialMetadata(); + }, + [&](ValueOrFailure> md) { + EXPECT_TRUE(md.ok()); + EXPECT_TRUE(md.value().has_value()); + EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()), + ContentTypeMetadata::kApplicationGrpc); + return initiator.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("hello world")), 0)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return initiator.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("hello world (2)")), 0)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return initiator.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("hello world (3)")), 0)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return initiator.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("hello world (4)")), 0)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return initiator.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("hello world (5)")), 0)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + initiator.FinishSends(); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + return initiator.PullServerTrailingMetadata(); + }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()), + GRPC_STATUS_UNIMPLEMENTED); + return Empty{}; + }); + auto handler = TickUntilServerCall(); + SpawnTestSeq( + handler, "handler", [&] { return handler.PullClientInitialMetadata(); }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(), + "/foo/bar"); + auto md_out = Arena::MakePooled(GetContext()); + md_out->Set(ContentTypeMetadata(), + ContentTypeMetadata::kApplicationGrpc); + return handler.PushServerInitialMetadata(std::move(md_out)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world"); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (2)"); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (3)"); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (4)"); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (5)"); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + auto md = Arena::MakePooled(GetContext()); + md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); + return handler.PushServerTrailingMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return Empty{}; + }); + WaitForAllPendingWork(); +} + +TRANSPORT_TEST(ServerStreamingRequest) { + SetServerAcceptor(); + auto initiator = CreateCall(); + SpawnTestSeq( + initiator, "initiator", + [&]() { + auto md = Arena::MakePooled(GetContext()); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + return initiator.PushClientInitialMetadata(std::move(md)); + }, + [&](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return initiator.PullServerInitialMetadata(); + }, + [&](ValueOrFailure> md) { + EXPECT_TRUE(md.ok()); + EXPECT_TRUE(md.value().has_value()); + EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()), + ContentTypeMetadata::kApplicationGrpc); + initiator.FinishSends(); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + "why hello neighbor"); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + "why hello neighbor (2)"); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + "why hello neighbor (3)"); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + "why hello neighbor (4)"); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + "why hello neighbor (5)"); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + "why hello neighbor (6)"); + return initiator.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + return initiator.PullServerTrailingMetadata(); + }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()), + GRPC_STATUS_UNIMPLEMENTED); + return Empty{}; + }); + auto handler = TickUntilServerCall(); + SpawnTestSeq( + handler, "handler", [&] { return handler.PullClientInitialMetadata(); }, + [&](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(), + "/foo/bar"); + auto md_out = Arena::MakePooled(GetContext()); + md_out->Set(ContentTypeMetadata(), + ContentTypeMetadata::kApplicationGrpc); + return handler.PushServerInitialMetadata(std::move(md_out)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PullMessage(); + }, + [&](NextResult msg) { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + return handler.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("why hello neighbor (2)")), 0)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("why hello neighbor (3)")), 0)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("why hello neighbor (4)")), 0)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("why hello neighbor (5)")), 0)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("why hello neighbor (6)")), 0)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + auto md = Arena::MakePooled(GetContext()); + md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); + return handler.PushServerTrailingMetadata(std::move(md)); + }, + [&](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return Empty{}; + }); + WaitForAllPendingWork(); +} + +} // namespace grpc_core diff --git a/test/core/transport/test_suite/chaotic_good_fixture.cc b/test/core/transport/test_suite/chaotic_good_fixture.cc new file mode 100644 index 00000000000..c9d9e0724e6 --- /dev/null +++ b/test/core/transport/test_suite/chaotic_good_fixture.cc @@ -0,0 +1,117 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "gmock/gmock.h" + +#include "src/core/ext/transport/chaotic_good/client_transport.h" +#include "src/core/ext/transport/chaotic_good/server_transport.h" +#include "src/core/lib/event_engine/memory_allocator_factory.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/resource_quota/resource_quota.h" +#include "src/core/lib/transport/promise_endpoint.h" +#include "test/core/transport/test_suite/fixture.h" + +using grpc_event_engine::experimental::EndpointConfig; +using grpc_event_engine::experimental::EventEngine; +using grpc_event_engine::experimental::FuzzingEventEngine; +using grpc_event_engine::experimental::MemoryQuotaBasedMemoryAllocatorFactory; +using grpc_event_engine::experimental::URIToResolvedAddress; + +namespace grpc_core { + +namespace { + +class MockEndpointConfig : public EndpointConfig { + public: + MOCK_METHOD(absl::optional, GetInt, (absl::string_view key), + (const, override)); + MOCK_METHOD(absl::optional, GetString, + (absl::string_view key), (const, override)); + MOCK_METHOD(void*, GetVoidPointer, (absl::string_view key), + (const, override)); +}; + +struct EndpointPair { + std::unique_ptr client; + std::unique_ptr server; +}; + +EndpointPair CreateEndpointPair( + grpc_event_engine::experimental::FuzzingEventEngine* event_engine, + ResourceQuotaRefPtr resource_quota, int port) { + std::unique_ptr client_endpoint; + std::unique_ptr server_endpoint; + + const auto resolved_address = + URIToResolvedAddress(absl::StrCat("ipv4:127.0.0.1:", port)).value(); + + ::testing::StrictMock endpoint_config; + auto listener = *event_engine->CreateListener( + [&server_endpoint](std::unique_ptr endpoint, + MemoryAllocator) { + server_endpoint = std::move(endpoint); + }, + [](absl::Status) {}, endpoint_config, + std::make_unique( + resource_quota->memory_quota())); + GPR_ASSERT(listener->Bind(resolved_address).ok()); + GPR_ASSERT(listener->Start().ok()); + + event_engine->Connect( + [&client_endpoint]( + absl::StatusOr> endpoint) { + GPR_ASSERT(endpoint.ok()); + client_endpoint = std::move(endpoint).value(); + }, + resolved_address, endpoint_config, + resource_quota->memory_quota()->CreateMemoryAllocator("client"), + Duration::Hours(3)); + + while (client_endpoint == nullptr || server_endpoint == nullptr) { + event_engine->Tick(); + } + + return EndpointPair{std::make_unique( + std::move(client_endpoint), SliceBuffer()), + std::make_unique( + std::move(server_endpoint), SliceBuffer())}; +} + +} // namespace + +TRANSPORT_FIXTURE(ChaoticGood) { + auto resource_quota = MakeResourceQuota("test"); + EndpointPair control_endpoints = + CreateEndpointPair(event_engine.get(), resource_quota, 1234); + EndpointPair data_endpoints = + CreateEndpointPair(event_engine.get(), resource_quota, 4321); + auto channel_args = + ChannelArgs() + .SetObject(resource_quota) + .SetObject(std::static_pointer_cast(event_engine)); + auto client_transport = + MakeOrphanable( + std::move(control_endpoints.client), std::move(data_endpoints.client), + event_engine); + auto server_transport = + MakeOrphanable( + channel_args, std::move(control_endpoints.server), + std::move(data_endpoints.server), event_engine); + return ClientAndServerTransportPair{std::move(client_transport), + std::move(server_transport)}; +} + +} // namespace grpc_core diff --git a/test/core/transport/test_suite/corpus/chaotic_good/empty b/test/core/transport/test_suite/corpus/chaotic_good/empty new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/test/core/transport/test_suite/corpus/chaotic_good/empty @@ -0,0 +1 @@ + diff --git a/test/core/transport/test_suite/corpus/inproc/empty b/test/core/transport/test_suite/corpus/inproc/empty new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/test/core/transport/test_suite/corpus/inproc/empty @@ -0,0 +1 @@ + diff --git a/test/core/transport/test_suite/fixture.cc b/test/core/transport/test_suite/fixture.cc new file mode 100644 index 00000000000..c1f26103693 --- /dev/null +++ b/test/core/transport/test_suite/fixture.cc @@ -0,0 +1,29 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/transport/test_suite/fixture.h" + +namespace grpc_core { + +TransportFixtureRegistry& TransportFixtureRegistry::Get() { + static TransportFixtureRegistry* registry = new TransportFixtureRegistry(); + return *registry; +} +void TransportFixtureRegistry::RegisterFixture( + absl::string_view name, + absl::AnyInvocable create) { + fixtures_.push_back({name, std::move(create)}); +} + +} // namespace grpc_core diff --git a/test/core/transport/test_suite/fixture.h b/test/core/transport/test_suite/fixture.h new file mode 100644 index 00000000000..a9aad176510 --- /dev/null +++ b/test/core/transport/test_suite/fixture.h @@ -0,0 +1,77 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_FIXTURE_H +#define GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_FIXTURE_H + +#include "src/core/lib/transport/transport.h" +#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" + +namespace grpc_core { + +class TransportFixture { + public: + struct ClientAndServerTransportPair { + OrphanablePtr client; + OrphanablePtr server; + }; + virtual ~TransportFixture() = default; + virtual ClientAndServerTransportPair CreateTransportPair( + std::shared_ptr + event_engine) = 0; +}; + +class TransportFixtureRegistry { + public: + static TransportFixtureRegistry& Get(); + void RegisterFixture(absl::string_view name, + absl::AnyInvocable create); + + struct Fixture { + absl::string_view name; + absl::AnyInvocable create; + }; + + const std::vector& fixtures() const { return fixtures_; } + + private: + std::vector fixtures_; +}; + +} // namespace grpc_core + +#define TRANSPORT_FIXTURE(name) \ + class TransportFixture_##name : public grpc_core::TransportFixture { \ + public: \ + using TransportFixture::TransportFixture; \ + ClientAndServerTransportPair CreateTransportPair( \ + std::shared_ptr \ + event_engine) override; \ + \ + private: \ + static grpc_core::TransportFixture* Create() { \ + return new TransportFixture_##name(); \ + } \ + static int registered_; \ + }; \ + int TransportFixture_##name::registered_ = \ + (grpc_core::TransportFixtureRegistry::Get().RegisterFixture( \ + #name, &TransportFixture_##name::Create), \ + 0); \ + grpc_core::TransportFixture::ClientAndServerTransportPair \ + TransportFixture_##name::CreateTransportPair( \ + std::shared_ptr \ + event_engine GRPC_UNUSED) + +#endif // GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_FIXTURE_H diff --git a/test/core/transport/test_suite/fuzzer.proto b/test/core/transport/test_suite/fuzzer.proto new file mode 100644 index 00000000000..8fadbacc35d --- /dev/null +++ b/test/core/transport/test_suite/fuzzer.proto @@ -0,0 +1,29 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package transport_test_suite; + +import "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto"; +import "test/core/util/fuzz_config_vars.proto"; + +message Msg { + uint32 test_id = 1; + uint32 fixture_id = 2; + + fuzzing_event_engine.Actions event_engine_actions = 10; + grpc.testing.FuzzConfigVars config_vars = 11; + repeated uint64 rng = 12; +} diff --git a/test/core/transport/test_suite/fuzzer_main.cc b/test/core/transport/test_suite/fuzzer_main.cc new file mode 100644 index 00000000000..94cd06d54e6 --- /dev/null +++ b/test/core/transport/test_suite/fuzzer_main.cc @@ -0,0 +1,72 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include + +#include "src/core/lib/config/config_vars.h" +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/experiments/config.h" +#include "src/core/lib/gprpp/env.h" +#include "src/libfuzzer/libfuzzer_macro.h" +#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" +#include "test/core/transport/test_suite/fixture.h" +#include "test/core/transport/test_suite/fuzzer.pb.h" +#include "test/core/transport/test_suite/test.h" +#include "test/core/util/fuzz_config_vars.h" +#include "test/core/util/proto_bit_gen.h" + +namespace grpc_event_engine { +namespace experimental { +extern bool g_event_engine_supports_fd; +} +} // namespace grpc_event_engine + +bool squelch = true; +static void dont_log(gpr_log_func_args* /*args*/) {} + +DEFINE_PROTO_FUZZER(const transport_test_suite::Msg& msg) { + const auto& tests = grpc_core::TransportTestRegistry::Get().tests(); + const auto& fixtures = grpc_core::TransportFixtureRegistry::Get().fixtures(); + GPR_ASSERT(!tests.empty()); + GPR_ASSERT(!fixtures.empty()); + const int test_id = msg.test_id() % tests.size(); + const int fixture_id = msg.fixture_id() % fixtures.size(); + + if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) { + gpr_set_log_function(dont_log); + } + + grpc_core::ConfigVars::Overrides overrides = + grpc_core::OverridesFromFuzzConfigVars(msg.config_vars()); + grpc_core::ConfigVars::SetOverrides(overrides); + grpc_core::TestOnlyReloadExperimentsFromConfigVariables(); + if (!squelch) { + fprintf(stderr, "RUN TEST '%s' with fixture '%s'\n", + std::string(tests[test_id].name).c_str(), + std::string(fixtures[fixture_id].name).c_str()); + } + grpc_core::ProtoBitGen bitgen(msg.rng()); + auto test = + tests[test_id].create(std::unique_ptr( + fixtures[fixture_id].create()), + msg.event_engine_actions(), bitgen); + test->RunTest(); + delete test; + GPR_ASSERT(!::testing::Test::HasFailure()); +} diff --git a/test/core/transport/test_suite/grpc_transport_test.bzl b/test/core/transport/test_suite/grpc_transport_test.bzl new file mode 100644 index 00000000000..52793f8846b --- /dev/null +++ b/test/core/transport/test_suite/grpc_transport_test.bzl @@ -0,0 +1,58 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Generate one transport test & associated fuzzer +""" + +load("//bazel:grpc_build_system.bzl", "grpc_cc_test") +load("//test/core/util:grpc_fuzzer.bzl", "grpc_proto_fuzzer") + +def grpc_transport_test(name, deps): + grpc_cc_test( + name = name + "_test", + srcs = [], + tags = ["no_windows", "no_mac"], + deps = [ + ":test_main", + ] + deps, + ) + + grpc_proto_fuzzer( + name = name + "_fuzzer", + srcs = ["fuzzer_main.cc"], + tags = ["no_windows", "no_mac"], + external_deps = [ + "gtest", + ], + deps = [ + ":test", + ":fixture", + ":fuzzer_proto", + "//:event_engine_base_hdrs", + "//:config_vars", + "//:exec_ctx", + "//:gpr", + "//:grpc_unsecure", + "//:iomgr_timer", + "//src/core:default_event_engine", + "//src/core:env", + "//src/core:experiments", + "//test/core/event_engine/fuzzing_event_engine", + "//test/core/util:fuzz_config_vars", + "//test/core/util:proto_bit_gen", + ] + deps, + corpus = "corpus/%s" % name, + proto = None, + ) diff --git a/test/core/transport/test_suite/inproc_fixture.cc b/test/core/transport/test_suite/inproc_fixture.cc new file mode 100644 index 00000000000..357bbef66be --- /dev/null +++ b/test/core/transport/test_suite/inproc_fixture.cc @@ -0,0 +1,25 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/ext/transport/inproc/inproc_transport.h" +#include "test/core/transport/test_suite/fixture.h" + +namespace grpc_core { + +TRANSPORT_FIXTURE(Inproc) { + auto transports = MakeInProcessTransportPair(); + return {std::move(transports.first), std::move(transports.second)}; +} + +} // namespace grpc_core diff --git a/test/core/transport/test_suite/no_op.cc b/test/core/transport/test_suite/no_op.cc new file mode 100644 index 00000000000..0efd226257d --- /dev/null +++ b/test/core/transport/test_suite/no_op.cc @@ -0,0 +1,28 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/transport/test_suite/test.h" + +namespace grpc_core { + +TRANSPORT_TEST(NoOp) {} + +TRANSPORT_TEST(WaitForAllPendingWork) { WaitForAllPendingWork(); } + +TRANSPORT_TEST(SetServerAcceptorAndFinish) { + SetServerAcceptor(); + WaitForAllPendingWork(); +} + +} // namespace grpc_core diff --git a/test/core/transport/test_suite/stress.cc b/test/core/transport/test_suite/stress.cc new file mode 100644 index 00000000000..2adaeec45bf --- /dev/null +++ b/test/core/transport/test_suite/stress.cc @@ -0,0 +1,131 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/random/random.h" + +#include "test/core/transport/test_suite/test.h" + +namespace grpc_core { + +TRANSPORT_TEST(ManyUnaryRequests) { + SetServerAcceptor(); + const int kNumRequests = absl::LogUniform(rng(), 10, 100); + std::list call_names; + auto make_call_name = [&call_names](int i, + absl::string_view suffix) -> const char* { + call_names.emplace_back(absl::StrCat("call-", i, "-", suffix)); + return call_names.back().c_str(); + }; + std::map client_messages; + std::map server_messages; + for (int i = 0; i < kNumRequests; i++) { + auto initiator = CreateCall(); + client_messages[i] = RandomMessage(); + server_messages[i] = RandomMessage(); + SpawnTestSeq( + initiator, make_call_name(i, "initiator"), + [initiator, i]() mutable { + auto md = Arena::MakePooled(GetContext()); + md->Set(HttpPathMetadata(), + Slice::FromCopiedString(std::to_string(i))); + return initiator.PushClientInitialMetadata(std::move(md)); + }, + [initiator, i, &client_messages](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + return initiator.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString(client_messages[i])), 0)); + }, + [initiator](StatusFlag status) mutable { + EXPECT_TRUE(status.ok()); + initiator.FinishSends(); + return initiator.PullServerInitialMetadata(); + }, + [initiator]( + ValueOrFailure> md) mutable { + EXPECT_TRUE(md.ok()); + EXPECT_TRUE(md.value().has_value()); + EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()), + ContentTypeMetadata::kApplicationGrpc); + return initiator.PullMessage(); + }, + [initiator, i, + &server_messages](NextResult msg) mutable { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + server_messages[i]); + return initiator.PullMessage(); + }, + [initiator](NextResult msg) mutable { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + return initiator.PullServerTrailingMetadata(); + }, + [initiator](ValueOrFailure md) mutable { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()), + GRPC_STATUS_UNIMPLEMENTED); + return Empty{}; + }); + } + for (int i = 0; i < kNumRequests; i++) { + auto handler = TickUntilServerCall(); + auto this_call_index = std::make_shared(-1); + SpawnTestSeq( + handler, make_call_name(i, "handler"), + [handler]() mutable { return handler.PullClientInitialMetadata(); }, + [handler, + this_call_index](ValueOrFailure md) mutable { + EXPECT_TRUE(md.ok()); + EXPECT_TRUE(absl::SimpleAtoi( + md.value()->get_pointer(HttpPathMetadata())->as_string_view(), + &*this_call_index)); + return handler.PullMessage(); + }, + [handler, this_call_index, + &client_messages](NextResult msg) mutable { + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + client_messages[*this_call_index]); + return handler.PullMessage(); + }, + [handler](NextResult msg) mutable { + EXPECT_FALSE(msg.has_value()); + EXPECT_FALSE(msg.cancelled()); + auto md = Arena::MakePooled(GetContext()); + md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); + return handler.PushServerInitialMetadata(std::move(md)); + }, + [handler, this_call_index, + &server_messages](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return handler.PushMessage(Arena::MakePooled( + SliceBuffer( + Slice::FromCopiedString(server_messages[*this_call_index])), + 0)); + }, + [handler](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + auto md = Arena::MakePooled(GetContext()); + md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); + return handler.PushServerTrailingMetadata(std::move(md)); + }, + [handler](StatusFlag result) mutable { + EXPECT_TRUE(result.ok()); + return Empty{}; + }); + } + WaitForAllPendingWork(); +} + +} // namespace grpc_core diff --git a/test/core/transport/test_suite/test.cc b/test/core/transport/test_suite/test.cc new file mode 100644 index 00000000000..6d48a5712fa --- /dev/null +++ b/test/core/transport/test_suite/test.cc @@ -0,0 +1,271 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/transport/test_suite/test.h" + +#include + +#include "absl/random/random.h" + +namespace grpc_core { + +/////////////////////////////////////////////////////////////////////////////// +// TransportTestRegistry + +TransportTestRegistry& TransportTestRegistry::Get() { + static TransportTestRegistry* registry = new TransportTestRegistry(); + return *registry; +} + +void TransportTestRegistry::RegisterTest( + absl::string_view name, + absl::AnyInvocable, + const fuzzing_event_engine::Actions&, + absl::BitGenRef) const> + create) { + if (absl::StartsWith(name, "DISABLED_")) return; + tests_.push_back({name, std::move(create)}); +} + +/////////////////////////////////////////////////////////////////////////////// +// TransportTest + +void TransportTest::RunTest() { + TestImpl(); + EXPECT_EQ(pending_actions_.size(), 0) + << "There are still pending actions: did you forget to call " + "WaitForAllPendingWork()?"; + transport_pair_.client.reset(); + transport_pair_.server.reset(); + event_engine_->TickUntilIdle(); + event_engine_->UnsetGlobalHooks(); +} + +void TransportTest::SetServerAcceptor() { + transport_pair_.server->server_transport()->SetAcceptor(&acceptor_); +} + +CallInitiator TransportTest::CreateCall() { + auto call = MakeCall(event_engine_.get(), Arena::Create(1024, &allocator_)); + call.handler.SpawnInfallible("start-call", [this, handler = call.handler]() { + transport_pair_.client->client_transport()->StartCall(handler); + return Empty{}; + }); + return std::move(call.initiator); +} + +CallHandler TransportTest::TickUntilServerCall() { + WatchDog watchdog(this); + for (;;) { + auto handler = acceptor_.PopHandler(); + if (handler.has_value()) return std::move(*handler); + event_engine_->Tick(); + } +} + +void TransportTest::WaitForAllPendingWork() { + WatchDog watchdog(this); + while (!pending_actions_.empty()) { + if (pending_actions_.front()->IsDone()) { + pending_actions_.pop(); + continue; + } + event_engine_->Tick(); + } +} + +void TransportTest::Timeout() { + std::vector lines; + lines.emplace_back("Timeout waiting for pending actions to complete"); + while (!pending_actions_.empty()) { + auto action = std::move(pending_actions_.front()); + pending_actions_.pop(); + if (action->IsDone()) continue; + absl::string_view state_name = + transport_test_detail::ActionState::StateString(action->Get()); + absl::string_view file_name = action->file(); + auto pos = file_name.find_last_of('/'); + if (pos != absl::string_view::npos) { + file_name = file_name.substr(pos + 1); + } + lines.emplace_back(absl::StrCat(" ", state_name, " ", action->name(), " [", + action->step(), "]: ", file_name, ":", + action->line())); + } + Crash(absl::StrJoin(lines, "\n")); +} + +std::string TransportTest::RandomString(int min_length, int max_length, + absl::string_view character_set) { + std::string out; + int length = absl::LogUniform(rng_, min_length, max_length + 1); + for (int i = 0; i < length; ++i) { + out.push_back( + character_set[absl::Uniform(rng_, 0, character_set.size())]); + } + return out; +} + +std::string TransportTest::RandomStringFrom( + std::initializer_list choices) { + size_t idx = absl::Uniform(rng_, 0, choices.size()); + auto it = choices.begin(); + for (size_t i = 0; i < idx; ++i) ++it; + return std::string(*it); +} + +std::string TransportTest::RandomMetadataKey() { + if (absl::Bernoulli(rng_, 0.1)) { + return RandomStringFrom({ + ":path", + ":method", + ":status", + ":authority", + ":scheme", + }); + } + std::string out; + do { + out = RandomString(1, 128, "abcdefghijklmnopqrstuvwxyz-_"); + } while (absl::EndsWith(out, "-bin")); + return out; +} + +std::string TransportTest::RandomMetadataValue(absl::string_view key) { + if (key == ":method") { + return RandomStringFrom({"GET", "POST", "PUT"}); + } + if (key == ":status") { + return absl::StrCat(absl::Uniform(rng_, 100, 600)); + } + if (key == ":scheme") { + return RandomStringFrom({"http", "https"}); + } + if (key == "te") { + return "trailers"; + } + static const NoDestruct kChars{[]() { + std::string out; + for (char c = 32; c < 127; c++) out.push_back(c); + return out; + }()}; + return RandomString(0, 128, *kChars); +} + +std::string TransportTest::RandomMetadataBinaryKey() { + return RandomString(1, 128, "abcdefghijklmnopqrstuvwxyz-_") + "-bin"; +} + +std::string TransportTest::RandomMetadataBinaryValue() { + static const NoDestruct kChars{[]() { + std::string out; + for (int c = 0; c < 256; c++) { + out.push_back(static_cast(static_cast(c))); + } + return out; + }()}; + return RandomString(0, 4096, *kChars); +} + +std::vector> +TransportTest::RandomMetadata() { + size_t size = 0; + const size_t max_size = absl::LogUniform(rng_, 64, 8000); + std::vector> out; + for (;;) { + std::string key; + std::string value; + if (absl::Bernoulli(rng_, 0.1)) { + key = RandomMetadataBinaryKey(); + value = RandomMetadataBinaryValue(); + } else { + key = RandomMetadataKey(); + value = RandomMetadataValue(key); + } + bool include = true; + for (size_t i = 0; i < out.size(); ++i) { + if (out[i].first == key) { + include = false; + break; + } + } + if (!include) continue; + size_t this_size = 32 + key.size() + value.size(); + if (size + this_size > max_size) { + if (out.empty()) continue; + break; + } + size += this_size; + out.emplace_back(std::move(key), std::move(value)); + } + return out; +} + +std::string TransportTest::RandomMessage() { + static const NoDestruct kChars{[]() { + std::string out; + for (int c = 0; c < 256; c++) { + out.push_back(static_cast(static_cast(c))); + } + return out; + }()}; + return RandomString(0, 1024 * 1024, *kChars); +} + +/////////////////////////////////////////////////////////////////////////////// +// TransportTest::Acceptor + +Arena* TransportTest::Acceptor::CreateArena() { + return Arena::Create(1024, allocator_); +} + +absl::StatusOr TransportTest::Acceptor::CreateCall( + ClientMetadata&, Arena* arena) { + auto call = MakeCall(event_engine_, arena); + handlers_.push(std::move(call.handler)); + return std::move(call.initiator); +} + +absl::optional TransportTest::Acceptor::PopHandler() { + if (!handlers_.empty()) { + auto handler = std::move(handlers_.front()); + handlers_.pop(); + return handler; + } + return absl::nullopt; +} + +/////////////////////////////////////////////////////////////////////////////// +// ActionState + +namespace transport_test_detail { + +ActionState::ActionState(NameAndLocation name_and_location) + : name_and_location_(name_and_location), state_(kNotCreated) {} + +bool ActionState::IsDone() { + switch (state_) { + case kNotCreated: + case kNotStarted: + case kStarted: + return false; + case kDone: + case kCancelled: + return true; + } +} + +} // namespace transport_test_detail + +} // namespace grpc_core diff --git a/test/core/transport/test_suite/test.h b/test/core/transport/test_suite/test.h new file mode 100644 index 00000000000..6981d5ca51f --- /dev/null +++ b/test/core/transport/test_suite/test.h @@ -0,0 +1,364 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_TEST_H +#define GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_TEST_H + +#include +#include +#include + +#include "absl/functional/any_invocable.h" +#include "absl/random/bit_gen_ref.h" +#include "absl/strings/string_view.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "src/core/lib/gprpp/time.h" +#include "src/core/lib/iomgr/timer_manager.h" +#include "src/core/lib/promise/cancel_callback.h" +#include "src/core/lib/promise/promise.h" +#include "src/core/lib/resource_quota/resource_quota.h" +#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" +#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" +#include "test/core/transport/test_suite/fixture.h" + +namespace grpc_core { + +namespace transport_test_detail { + +struct NameAndLocation { + // NOLINTNEXTLINE + NameAndLocation(const char* name, SourceLocation location = {}) + : location_(location), name_(name) {} + NameAndLocation Next() const { + return NameAndLocation(name_, location_, step_ + 1); + } + + SourceLocation location() const { return location_; } + absl::string_view name() const { return name_; } + int step() const { return step_; } + + private: + NameAndLocation(absl::string_view name, SourceLocation location, int step) + : location_(location), name_(name), step_(step) {} + SourceLocation location_; + absl::string_view name_; + int step_ = 1; +}; + +class ActionState { + public: + enum State : uint8_t { + kNotCreated, + kNotStarted, + kStarted, + kDone, + kCancelled, + }; + + static absl::string_view StateString(State state) { + switch (state) { + case kNotCreated: + return "🚦"; + case kNotStarted: + return "⏰"; + case kStarted: + return "🚗"; + case kDone: + return "🏁"; + case kCancelled: + return "💥"; + } + } + + explicit ActionState(NameAndLocation name_and_location); + + State Get() const { return state_; } + void Set(State state) { + gpr_log(GPR_INFO, "%s", + absl::StrCat(StateString(state), " ", name(), " [", step(), "] ", + file(), ":", line()) + .c_str()); + state_ = state; + } + const NameAndLocation& name_and_location() const { + return name_and_location_; + } + SourceLocation location() const { return name_and_location().location(); } + const char* file() const { return location().file(); } + int line() const { return location().line(); } + absl::string_view name() const { return name_and_location().name(); } + int step() const { return name_and_location().step(); } + bool IsDone(); + + private: + const NameAndLocation name_and_location_; + std::atomic state_; +}; + +using PromiseSpawner = std::function)>; +using ActionStateFactory = + absl::FunctionRef(NameAndLocation)>; + +template +PromiseSpawner SpawnerForContext( + Context context, + grpc_event_engine::experimental::EventEngine* event_engine) { + return [context = std::move(context), event_engine]( + absl::string_view name, Promise promise) mutable { + // Pass new promises via event engine to allow fuzzers to explore + // reorderings of possibly interleaved spawns. + event_engine->Run([name, context = std::move(context), + promise = std::move(promise)]() mutable { + context.SpawnInfallible(name, std::move(promise)); + }); + }; +} + +template +using NextSpawner = absl::AnyInvocable; + +template +Promise WrapPromiseAndNext(std::shared_ptr action_state, + Promise promise, NextSpawner next) { + return Promise(OnCancel( + [action_state, promise = std::move(promise), + next = std::move(next)]() mutable -> Poll { + action_state->Set(ActionState::kStarted); + auto r = promise(); + if (auto* p = r.value_if_ready()) { + action_state->Set(ActionState::kDone); + next(std::move(*p)); + return Empty{}; + } else { + return Pending{}; + } + }, + [action_state]() { action_state->Set(ActionState::kCancelled); })); +} + +template +NextSpawner WrapFollowUps(NameAndLocation, ActionStateFactory, + PromiseSpawner) { + return [](Empty) {}; +} + +template +NextSpawner WrapFollowUps(NameAndLocation loc, + ActionStateFactory action_state_factory, + PromiseSpawner spawner, FirstFollowUp first, + FollowUps... follow_ups) { + using Factory = promise_detail::OncePromiseFactory; + using FactoryPromise = typename Factory::Promise; + using Result = typename FactoryPromise::Result; + auto action_state = action_state_factory(loc); + return [spawner, factory = Factory(std::move(first)), + next = WrapFollowUps(loc.Next(), action_state_factory, + spawner, std::move(follow_ups)...), + action_state = std::move(action_state), + name = loc.name()](Arg arg) mutable { + action_state->Set(ActionState::kNotStarted); + spawner(name, + WrapPromiseAndNext(std::move(action_state), + Promise(factory.Make(std::move(arg))), + std::move(next))); + }; +} + +template +void StartSeq(NameAndLocation loc, ActionStateFactory action_state_factory, + PromiseSpawner spawner, First first, FollowUps... followups) { + using Factory = promise_detail::OncePromiseFactory; + using FactoryPromise = typename Factory::Promise; + using Result = typename FactoryPromise::Result; + auto action_state = action_state_factory(loc); + auto next = WrapFollowUps(loc.Next(), action_state_factory, spawner, + std::move(followups)...); + spawner( + loc.name(), + [spawner, first = Factory(std::move(first)), next = std::move(next), + action_state = std::move(action_state), name = loc.name()]() mutable { + action_state->Set(ActionState::kNotStarted); + spawner(name, WrapPromiseAndNext(std::move(action_state), + Promise(first.Make()), + std::move(next))); + return Empty{}; + }); +} + +}; // namespace transport_test_detail + +class TransportTest : public ::testing::Test { + public: + void RunTest(); + + protected: + TransportTest(std::unique_ptr fixture, + const fuzzing_event_engine::Actions& actions, + absl::BitGenRef rng) + : event_engine_(std::make_shared< + grpc_event_engine::experimental::FuzzingEventEngine>( + []() { + grpc_timer_manager_set_threading(false); + grpc_event_engine::experimental::FuzzingEventEngine::Options + options; + return options; + }(), + actions)), + fixture_(std::move(fixture)), + rng_(rng) {} + + void SetServerAcceptor(); + CallInitiator CreateCall(); + + std::string RandomString(int min_length, int max_length, + absl::string_view character_set); + std::string RandomStringFrom( + std::initializer_list choices); + std::string RandomMetadataKey(); + std::string RandomMetadataValue(absl::string_view key); + std::string RandomMetadataBinaryKey(); + std::string RandomMetadataBinaryValue(); + std::vector> RandomMetadata(); + std::string RandomMessage(); + absl::BitGenRef rng() { return rng_; } + + CallHandler TickUntilServerCall(); + void WaitForAllPendingWork(); + + // Alternative for Seq for test driver code. + // Registers each step so that WaitForAllPendingWork() can report progress, + // and wait for completion... AND generate good failure messages when a + // sequence doesn't complete in a timely manner. + template + void SpawnTestSeq(Context context, + transport_test_detail::NameAndLocation name_and_location, + Actions... actions) { + transport_test_detail::StartSeq( + name_and_location, + [this](transport_test_detail::NameAndLocation name_and_location) { + auto action = std::make_shared( + name_and_location); + pending_actions_.push(action); + return action; + }, + transport_test_detail::SpawnerForContext(std::move(context), + event_engine_.get()), + std::move(actions)...); + } + + private: + virtual void TestImpl() = 0; + + void Timeout(); + + class Acceptor final : public ServerTransport::Acceptor { + public: + Acceptor(grpc_event_engine::experimental::EventEngine* event_engine, + MemoryAllocator* allocator) + : event_engine_(event_engine), allocator_(allocator) {} + + Arena* CreateArena() override; + absl::StatusOr CreateCall( + ClientMetadata& client_initial_metadata, Arena* arena) override; + absl::optional PopHandler(); + + private: + std::queue handlers_; + grpc_event_engine::experimental::EventEngine* const event_engine_; + MemoryAllocator* const allocator_; + }; + + class WatchDog { + public: + explicit WatchDog(TransportTest* test) : test_(test) {} + ~WatchDog() { test_->event_engine_->Cancel(timer_); } + + private: + TransportTest* const test_; + grpc_event_engine::experimental::EventEngine::TaskHandle const timer_{ + test_->event_engine_->RunAfter(Duration::Minutes(5), + [this]() { test_->Timeout(); })}; + }; + + std::shared_ptr + event_engine_{ + std::make_shared( + []() { + grpc_timer_manager_set_threading(false); + grpc_event_engine::experimental::FuzzingEventEngine::Options + options; + return options; + }(), + fuzzing_event_engine::Actions())}; + std::unique_ptr fixture_; + MemoryAllocator allocator_ = MakeResourceQuota("test-quota") + ->memory_quota() + ->CreateMemoryAllocator("test-allocator"); + Acceptor acceptor_{event_engine_.get(), &allocator_}; + TransportFixture::ClientAndServerTransportPair transport_pair_ = + fixture_->CreateTransportPair(event_engine_); + std::queue> + pending_actions_; + absl::BitGenRef rng_; +}; + +class TransportTestRegistry { + public: + static TransportTestRegistry& Get(); + void RegisterTest( + absl::string_view name, + absl::AnyInvocable, + const fuzzing_event_engine::Actions&, + absl::BitGenRef) const> + create); + + struct Test { + absl::string_view name; + absl::AnyInvocable, + const fuzzing_event_engine::Actions&, + absl::BitGenRef) const> + create; + }; + + const std::vector& tests() const { return tests_; } + + private: + std::vector tests_; +}; + +} // namespace grpc_core + +#define TRANSPORT_TEST(name) \ + class TransportTest_##name : public grpc_core::TransportTest { \ + public: \ + using TransportTest::TransportTest; \ + void TestBody() override { RunTest(); } \ + \ + private: \ + void TestImpl() override; \ + static grpc_core::TransportTest* Create( \ + std::unique_ptr fixture, \ + const fuzzing_event_engine::Actions& actions, absl::BitGenRef rng) { \ + return new TransportTest_##name(std::move(fixture), actions, rng); \ + } \ + static int registered_; \ + }; \ + int TransportTest_##name::registered_ = \ + (grpc_core::TransportTestRegistry::Get().RegisterTest(#name, &Create), \ + 0); \ + void TransportTest_##name::TestImpl() + +#endif // GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_TEST_H diff --git a/test/core/transport/test_suite/test_main.cc b/test/core/transport/test_suite/test_main.cc new file mode 100644 index 00000000000..4080f1cfc64 --- /dev/null +++ b/test/core/transport/test_suite/test_main.cc @@ -0,0 +1,42 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/random/random.h" + +#include "src/core/lib/debug/trace.h" +#include "test/core/transport/test_suite/fixture.h" +#include "test/core/transport/test_suite/test.h" +#include "test/core/util/test_config.h" + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(&argc, argv); + absl::BitGen bitgen; + ::testing::InitGoogleTest(&argc, argv); + for (const auto& test : grpc_core::TransportTestRegistry::Get().tests()) { + for (const auto& fixture : + grpc_core::TransportFixtureRegistry::Get().fixtures()) { + ::testing::RegisterTest( + "TransportTest", absl::StrCat(test.name, "/", fixture.name).c_str(), + nullptr, nullptr, __FILE__, __LINE__, + [test = &test, fixture = &fixture, + &bitgen]() -> grpc_core::TransportTest* { + return test->create( + std::unique_ptr(fixture->create()), + fuzzing_event_engine::Actions(), bitgen); + }); + } + } + grpc_tracer_init(); + return RUN_ALL_TESTS(); +} diff --git a/tools/distrib/fix_build_deps.py b/tools/distrib/fix_build_deps.py index 18af4cdecd8..9dc0445ddd9 100755 --- a/tools/distrib/fix_build_deps.py +++ b/tools/distrib/fix_build_deps.py @@ -403,6 +403,7 @@ for dirname in [ "test/core/promise", "test/core/resource_quota", "test/core/transport/chaotic_good", + "test/core/transport/test_suite", "fuzztest", "fuzztest/core/channel", "fuzztest/core/transport/chttp2", @@ -423,6 +424,7 @@ for dirname in [ "grpc_cc_library": grpc_cc_library, "grpc_cc_test": grpc_cc_library, "grpc_core_end2end_test": lambda **kwargs: None, + "grpc_transport_test": lambda **kwargs: None, "grpc_fuzzer": grpc_cc_library, "grpc_fuzz_test": grpc_cc_library, "grpc_proto_fuzzer": grpc_cc_library, @@ -655,14 +657,28 @@ def make_library(library): return (library, error, deps, external_deps) +def matches_target(library, target): + if not target.startswith("//"): + if "/" in target: + target = "//" + target + else: + target = "//:" + target + if target == "..." or target == "//...": + return True + if target.endswith("/..."): + return library.startswith(target[:-4]) + return library == target + + def main() -> None: update_libraries = [] for library in sorted(consumes.keys()): if library in no_update: continue - if args.targets and library not in args.targets: - continue - update_libraries.append(library) + for target in args.targets: + if matches_target(library, target): + update_libraries.append(library) + break with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as p: updated_libraries = p.map(make_library, update_libraries, 1) diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index ca55f6ef0c5..6d3fc272028 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -1945,6 +1945,26 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "chaotic_good_test", + "platforms": [ + "linux", + "posix" + ], + "uses_polling": true + }, { "args": [], "benchmark": false, @@ -5035,6 +5055,26 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "inproc_test", + "platforms": [ + "linux", + "posix" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,