[transport] Add a transport test suite for promise based transports (#35476)

Implemented for inproc & chaotic-good

Closes #35476

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35476 from ctiller:v3-svall 5358538d54
PiperOrigin-RevId: 599701775
pull/32487/head^2
Craig Tiller 1 year ago committed by Copybara-Service
parent f41de0825c
commit 1751f1043e
  1. 1
      BUILD
  2. 126
      CMakeLists.txt
  3. 70
      build_autogenerated.yaml
  4. 2
      src/core/BUILD
  5. 2
      src/core/ext/transport/chaotic_good/chaotic_good_transport.cc
  6. 28
      src/core/ext/transport/chaotic_good/chaotic_good_transport.h
  7. 18
      src/core/ext/transport/chaotic_good/client_transport.cc
  8. 190
      src/core/ext/transport/chaotic_good/server_transport.cc
  9. 10
      src/core/ext/transport/chaotic_good/server_transport.h
  10. 33
      src/core/ext/transport/inproc/inproc_transport.cc
  11. 8
      src/core/ext/transport/inproc/inproc_transport.h
  12. 2
      src/core/lib/gprpp/debug_location.h
  13. 13
      src/core/lib/promise/for_each.h
  14. 20
      src/core/lib/promise/inter_activity_pipe.h
  15. 2
      src/core/lib/promise/observable.h
  16. 1
      src/core/lib/promise/status_flag.h
  17. 3
      src/core/lib/transport/promise_endpoint.cc
  18. 51
      src/core/lib/transport/transport.cc
  19. 50
      src/core/lib/transport/transport.h
  20. 44
      test/core/promise/inter_activity_pipe_test.cc
  21. 12
      test/core/transport/chaotic_good/client_transport_error_test.cc
  22. 22
      test/core/transport/chaotic_good/client_transport_test.cc
  23. 165
      test/core/transport/test_suite/BUILD
  24. 151
      test/core/transport/test_suite/call_content.cc
  25. 681
      test/core/transport/test_suite/call_shapes.cc
  26. 117
      test/core/transport/test_suite/chaotic_good_fixture.cc
  27. 1
      test/core/transport/test_suite/corpus/chaotic_good/empty
  28. 1
      test/core/transport/test_suite/corpus/inproc/empty
  29. 29
      test/core/transport/test_suite/fixture.cc
  30. 77
      test/core/transport/test_suite/fixture.h
  31. 29
      test/core/transport/test_suite/fuzzer.proto
  32. 72
      test/core/transport/test_suite/fuzzer_main.cc
  33. 58
      test/core/transport/test_suite/grpc_transport_test.bzl
  34. 25
      test/core/transport/test_suite/inproc_fixture.cc
  35. 28
      test/core/transport/test_suite/no_op.cc
  36. 131
      test/core/transport/test_suite/stress.cc
  37. 271
      test/core/transport/test_suite/test.cc
  38. 364
      test/core/transport/test_suite/test.h
  39. 42
      test/core/transport/test_suite/test_main.cc
  40. 22
      tools/distrib/fix_build_deps.py
  41. 40
      tools/run_tests/generated/tests.json

@ -1580,6 +1580,7 @@ grpc_cc_library(
"//src/core:poll",
"//src/core:pollset_set",
"//src/core:posix_event_engine_base_hdrs",
"//src/core:prioritized_race",
"//src/core:promise_status",
"//src/core:race",
"//src/core:random_early_detection",

126
CMakeLists.txt generated

@ -957,6 +957,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx channel_trace_test)
add_dependencies(buildtests_cxx channelz_registry_test)
add_dependencies(buildtests_cxx channelz_service_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx chaotic_good_test)
endif()
add_dependencies(buildtests_cxx check_gcp_environment_linux_test)
add_dependencies(buildtests_cxx check_gcp_environment_windows_test)
add_dependencies(buildtests_cxx chunked_vector_test)
@ -1129,6 +1132,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx if_test)
add_dependencies(buildtests_cxx init_test)
add_dependencies(buildtests_cxx initial_settings_frame_bad_client_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx inproc_test)
endif()
add_dependencies(buildtests_cxx insecure_security_connector_test)
add_dependencies(buildtests_cxx inter_activity_latch_test)
add_dependencies(buildtests_cxx inter_activity_pipe_test)
@ -9884,6 +9890,69 @@ target_link_libraries(channelz_service_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_executable(chaotic_good_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
src/core/ext/transport/chaotic_good/chaotic_good_transport.cc
src/core/ext/transport/chaotic_good/client_transport.cc
src/core/ext/transport/chaotic_good/frame.cc
src/core/ext/transport/chaotic_good/frame_header.cc
src/core/ext/transport/chaotic_good/server_transport.cc
src/core/lib/transport/promise_endpoint.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
test/core/transport/test_suite/call_content.cc
test/core/transport/test_suite/call_shapes.cc
test/core/transport/test_suite/chaotic_good_fixture.cc
test/core/transport/test_suite/fixture.cc
test/core/transport/test_suite/no_op.cc
test/core/transport/test_suite/stress.cc
test/core/transport/test_suite/test.cc
test/core/transport/test_suite/test_main.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(chaotic_good_test
PRIVATE
"GPR_DLL_IMPORTS"
"GRPC_DLL_IMPORTS"
)
endif()
endif()
target_compile_features(chaotic_good_test PUBLIC cxx_std_14)
target_include_directories(chaotic_good_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(chaotic_good_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)
@ -17208,6 +17277,63 @@ target_link_libraries(initial_settings_frame_bad_client_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_executable(inproc_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
test/core/transport/test_suite/call_content.cc
test/core/transport/test_suite/call_shapes.cc
test/core/transport/test_suite/fixture.cc
test/core/transport/test_suite/inproc_fixture.cc
test/core/transport/test_suite/no_op.cc
test/core/transport/test_suite/stress.cc
test/core/transport/test_suite/test.cc
test/core/transport/test_suite/test_main.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(inproc_test
PRIVATE
"GPR_DLL_IMPORTS"
"GRPC_DLL_IMPORTS"
)
endif()
endif()
target_compile_features(inproc_test PUBLIC cxx_std_14)
target_include_directories(inproc_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(inproc_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -4572,6 +4572,7 @@ libs:
- src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/prioritized_race.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
@ -7262,6 +7263,49 @@ targets:
- gtest
- grpcpp_channelz
- grpc++_test_util
- name: chaotic_good_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/transport/chaotic_good/chaotic_good_transport.h
- src/core/ext/transport/chaotic_good/client_transport.h
- src/core/ext/transport/chaotic_good/frame.h
- src/core/ext/transport/chaotic_good/frame_header.h
- src/core/ext/transport/chaotic_good/server_transport.h
- src/core/lib/promise/event_engine_wakeup_scheduler.h
- src/core/lib/promise/inter_activity_pipe.h
- src/core/lib/promise/mpsc.h
- src/core/lib/promise/switch.h
- src/core/lib/promise/wait_set.h
- src/core/lib/transport/promise_endpoint.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/transport/test_suite/fixture.h
- test/core/transport/test_suite/test.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- src/core/ext/transport/chaotic_good/chaotic_good_transport.cc
- src/core/ext/transport/chaotic_good/client_transport.cc
- src/core/ext/transport/chaotic_good/frame.cc
- src/core/ext/transport/chaotic_good/frame_header.cc
- src/core/ext/transport/chaotic_good/server_transport.cc
- src/core/lib/transport/promise_endpoint.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
- test/core/transport/test_suite/call_content.cc
- test/core/transport/test_suite/call_shapes.cc
- test/core/transport/test_suite/chaotic_good_fixture.cc
- test/core/transport/test_suite/fixture.cc
- test/core/transport/test_suite/no_op.cc
- test/core/transport/test_suite/stress.cc
- test/core/transport/test_suite/test.cc
- test/core/transport/test_suite/test_main.cc
deps:
- gtest
- protobuf
- grpc_test_util
platforms:
- linux
- posix
- name: check_gcp_environment_linux_test
gtest: true
build: test
@ -10775,6 +10819,32 @@ targets:
deps:
- gtest
- grpc_test_util
- name: inproc_test
gtest: true
build: test
language: c++
headers:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/transport/test_suite/fixture.h
- test/core/transport/test_suite/test.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
- test/core/transport/test_suite/call_content.cc
- test/core/transport/test_suite/call_shapes.cc
- test/core/transport/test_suite/fixture.cc
- test/core/transport/test_suite/inproc_fixture.cc
- test/core/transport/test_suite/no_op.cc
- test/core/transport/test_suite/stress.cc
- test/core/transport/test_suite/test.cc
- test/core/transport/test_suite/test_main.cc
deps:
- gtest
- protobuf
- grpc_test_util
platforms:
- linux
- posix
- name: insecure_security_connector_test
gtest: true
build: test

@ -6530,11 +6530,13 @@ grpc_cc_library(
deps = [
"chaotic_good_frame",
"chaotic_good_frame_header",
"event_engine_tcp_socket_utils",
"grpc_promise_endpoint",
"if",
"try_join",
"try_seq",
"//:gpr_platform",
"//:grpc_trace",
"//:hpack_encoder",
"//:promise",
],

@ -16,4 +16,6 @@
#include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h"
grpc_core::TraceFlag grpc_chaotic_good_trace(false, "chaotic_good");
namespace grpc_core {} // namespace grpc_core

@ -22,12 +22,16 @@
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/transport/promise_endpoint.h"
extern grpc_core::TraceFlag grpc_chaotic_good_trace;
namespace grpc_core {
namespace chaotic_good {
@ -40,6 +44,13 @@ class ChaoticGoodTransport {
auto WriteFrame(const FrameInterface& frame) {
auto buffers = frame.Serialize(&encoder_);
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: WriteFrame to:%s %s",
ResolvedAddressToString(control_endpoint_->GetPeerAddress())
.value_or("<<unknown peer address>>")
.c_str(),
frame.ToString().c_str());
}
return TryJoin<absl::StatusOr>(
control_endpoint_->Write(std::move(buffers.control)),
data_endpoint_->Write(std::move(buffers.data)));
@ -54,6 +65,15 @@ class ChaoticGoodTransport {
auto frame_header =
FrameHeader::Parse(reinterpret_cast<const uint8_t*>(
GRPC_SLICE_START_PTR(read_buffer.c_slice())));
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: ReadHeader from:%s %s",
ResolvedAddressToString(control_endpoint_->GetPeerAddress())
.value_or("<<unknown peer address>>")
.c_str(),
frame_header.ok()
? frame_header->ToString().c_str()
: frame_header.status().ToString().c_str());
}
// Read header and trailers from control endpoint.
// Read message padding and message from data endpoint.
return If(
@ -80,9 +100,11 @@ class ChaoticGoodTransport {
std::move(std::get<1>(*buffers))});
});
},
[&frame_header]()
-> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> {
return frame_header.status();
[&frame_header]() {
return [status = frame_header.status()]() mutable
-> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> {
return std::move(status);
};
});
});
}

@ -165,11 +165,7 @@ auto ChaoticGoodClientTransport::TransportReadLoop() {
}
auto ChaoticGoodClientTransport::OnTransportActivityDone() {
return [this](absl::Status status) {
if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) {
this->AbortWithError();
}
};
return [this](absl::Status) { AbortWithError(); };
}
ChaoticGoodClientTransport::ChaoticGoodClientTransport(
@ -279,7 +275,17 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
// At this point, the connection is set up.
// Start sending data frames.
call_handler.SpawnGuarded("outbound_loop", [this, call_handler]() mutable {
return CallOutboundLoop(MakeStream(call_handler), call_handler);
const uint32_t stream_id = MakeStream(call_handler);
return Map(CallOutboundLoop(stream_id, call_handler),
[stream_id, this](absl::Status result) {
if (!result.ok()) {
CancelFrame frame;
frame.stream_id = stream_id;
outgoing_frames_.MakeSender().UnbufferedImmediateSend(
std::move(frame));
}
return result;
});
});
}

@ -29,6 +29,7 @@
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h"
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
@ -107,57 +108,98 @@ auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
}),
[](StatusFlag status) { return StatusCast<absl::Status>(status); });
},
[error = std::move(error)]() { return error; });
[&error, &frame]() {
gpr_log(GPR_INFO,
"CHAOTIC_GOOD: Cannot pass frame to stream. Error:%s Frame:%s",
error.ToString().c_str(), frame.ToString().c_str());
return Immediate(std::move(error));
});
}
auto ChaoticGoodServerTransport::SendFragment(
ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames) {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: SendFragment: frame=%s",
frame.ToString().c_str());
return Map(outgoing_frames.Send(std::move(frame)),
[](bool success) -> absl::Status {
if (!success) {
// Failed to send outgoing frame.
return absl::UnavailableError("Transport closed.");
}
return absl::OkStatus();
});
}
auto ChaoticGoodServerTransport::SendCallBody(
uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator) {
// Continuously send client frame with client to server
// messages.
return ForEach(OutgoingMessages(call_initiator),
[stream_id, outgoing_frames, aligned_bytes = aligned_bytes_](
MessageHandle message) mutable {
ServerFragmentFrame frame;
// Construct frame header (flags, header_length
// and trailer_length will be added in
// serialization).
const uint32_t message_length = message->payload()->Length();
const uint32_t padding =
message_length % aligned_bytes == 0
? 0
: aligned_bytes - message_length % aligned_bytes;
GPR_ASSERT((message_length + padding) % aligned_bytes == 0);
frame.message = FragmentMessage(std::move(message), padding,
message_length);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames);
});
}
auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator) {
return TrySeq(
// Wait for initial metadata then send it out.
call_initiator.PullServerInitialMetadata(),
[stream_id, outgoing_frames, call_initiator,
this](absl::optional<ServerMetadataHandle> md) mutable {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: SendCallInitialMetadataAndBody: md=%s",
md.has_value() ? (*md)->DebugString().c_str() : "null");
return If(
md.has_value(),
[&md, stream_id, &outgoing_frames, &call_initiator, this]() {
ServerFragmentFrame frame;
frame.headers = std::move(*md);
frame.stream_id = stream_id;
return TrySeq(
SendFragment(std::move(frame), outgoing_frames),
SendCallBody(stream_id, outgoing_frames, call_initiator));
},
[]() { return absl::OkStatus(); });
});
}
auto ChaoticGoodServerTransport::CallOutboundLoop(
uint32_t stream_id, CallInitiator call_initiator) {
auto send_fragment = [stream_id,
outgoing_frames = outgoing_frames_.MakeSender()](
ServerFragmentFrame frame) mutable {
frame.stream_id = stream_id;
return Map(outgoing_frames.Send(std::move(frame)),
[](bool success) -> absl::Status {
if (!success) {
// Failed to send outgoing frame.
return absl::UnavailableError("Transport closed.");
}
return absl::OkStatus();
});
};
return Seq(
TrySeq(
// Wait for initial metadata then send it out.
call_initiator.PullServerInitialMetadata(),
[send_fragment](ServerMetadataHandle md) mutable {
ServerFragmentFrame frame;
frame.headers = std::move(md);
return send_fragment(std::move(frame));
},
// Continuously send client frame with client to server messages.
ForEach(OutgoingMessages(call_initiator),
[send_fragment, aligned_bytes = aligned_bytes_](
MessageHandle message) mutable {
ServerFragmentFrame frame;
// Construct frame header (flags, header_length and
// trailer_length will be added in serialization).
const uint32_t message_length =
message->payload()->Length();
const uint32_t padding =
message_length % aligned_bytes == 0
? 0
: aligned_bytes - message_length % aligned_bytes;
GPR_ASSERT((message_length + padding) % aligned_bytes == 0);
frame.message = FragmentMessage(std::move(message), padding,
message_length);
return send_fragment(std::move(frame));
})),
call_initiator.PullServerTrailingMetadata(),
[send_fragment](ServerMetadataHandle md) mutable {
ServerFragmentFrame frame;
frame.trailers = std::move(md);
return send_fragment(std::move(frame));
});
auto outgoing_frames = outgoing_frames_.MakeSender();
return Seq(Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames,
call_initiator),
[stream_id](absl::Status main_body_result) {
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_DEBUG,
"CHAOTIC_GOOD: CallOutboundLoop: stream_id=%d "
"main_body_result=%s",
stream_id, main_body_result.ToString().c_str());
}
return Empty{};
}),
call_initiator.PullServerTrailingMetadata(),
[stream_id, outgoing_frames](ServerMetadataHandle md) mutable {
ServerFragmentFrame frame;
frame.trailers = std::move(md);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames);
});
}
auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
@ -171,13 +213,27 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
if (status.ok()) {
auto create_call_result =
acceptor_->CreateCall(*fragment_frame.headers, arena.release());
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO,
"CHAOTIC_GOOD: DeserializeAndPushFragmentToNewCall: "
"create_call_result=%s",
create_call_result.ok()
? "ok"
: create_call_result.status().ToString().c_str());
}
if (create_call_result.ok()) {
call_initiator.emplace(std::move(*create_call_result));
call_initiator->SpawnGuarded(
"server-write", [this, stream_id = frame_header.stream_id,
call_initiator = *call_initiator]() {
return CallOutboundLoop(stream_id, call_initiator);
});
auto add_result = NewStream(frame_header.stream_id, *call_initiator);
if (add_result.ok()) {
call_initiator->SpawnGuarded(
"server-write", [this, stream_id = frame_header.stream_id,
call_initiator = *call_initiator]() {
return CallOutboundLoop(stream_id, call_initiator);
});
} else {
call_initiator.reset();
status = add_result;
}
} else {
status = create_call_result.status();
}
@ -254,11 +310,15 @@ auto ChaoticGoodServerTransport::TransportReadLoop() {
});
}
auto ChaoticGoodServerTransport::OnTransportActivityDone() {
return [this](absl::Status status) {
if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) {
this->AbortWithError();
auto ChaoticGoodServerTransport::OnTransportActivityDone(
absl::string_view activity) {
return [this, activity](absl::Status status) {
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO,
"CHAOTIC_GOOD: OnTransportActivityDone: activity=%s status=%s",
std::string(activity).c_str(), status.ToString().c_str());
}
AbortWithError();
};
}
@ -274,7 +334,7 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport(
event_engine_(event_engine),
writer_{MakeActivity(TransportWriteLoop(),
EventEngineWakeupScheduler(event_engine),
OnTransportActivityDone())},
OnTransportActivityDone("writer"))},
reader_{nullptr} {}
void ChaoticGoodServerTransport::SetAcceptor(Acceptor* acceptor) {
@ -283,7 +343,7 @@ void ChaoticGoodServerTransport::SetAcceptor(Acceptor* acceptor) {
acceptor_ = acceptor;
reader_ = MakeActivity(TransportReadLoop(),
EventEngineWakeupScheduler(event_engine_),
OnTransportActivityDone());
OnTransportActivityDone("reader"));
}
ChaoticGoodServerTransport::~ChaoticGoodServerTransport() {
@ -330,5 +390,19 @@ absl::optional<CallInitiator> ChaoticGoodServerTransport::ExtractStream(
return std::move(r);
}
absl::Status ChaoticGoodServerTransport::NewStream(
uint32_t stream_id, CallInitiator call_initiator) {
MutexLock lock(&mu_);
auto it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
return absl::InternalError("Stream already exists");
}
if (stream_id <= last_seen_new_stream_id_) {
return absl::InternalError("Stream id is not increasing");
}
stream_map_.emplace(stream_id, std::move(call_initiator));
return absl::OkStatus();
}
} // namespace chaotic_good
} // namespace grpc_core

@ -107,8 +107,15 @@ class ChaoticGoodServerTransport final : public Transport,
absl::Status NewStream(uint32_t stream_id, CallInitiator call_initiator);
absl::optional<CallInitiator> LookupStream(uint32_t stream_id);
absl::optional<CallInitiator> ExtractStream(uint32_t stream_id);
auto SendCallInitialMetadataAndBody(uint32_t stream_id,
MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator);
auto SendCallBody(uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator);
static auto SendFragment(ServerFragmentFrame frame,
MpscSender<ServerFrame> outgoing_frames);
auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator);
auto OnTransportActivityDone();
auto OnTransportActivityDone(absl::string_view activity);
auto TransportReadLoop();
auto TransportWriteLoop();
// Read different parts of the server frame from control/data endpoints
@ -133,6 +140,7 @@ class ChaoticGoodServerTransport final : public Transport,
Mutex mu_;
// Map of stream incoming server frames, key is stream_id.
StreamMap stream_map_ ABSL_GUARDED_BY(mu_);
uint32_t last_seen_new_stream_id_ = 0;
grpc_event_engine::experimental::MemoryAllocator allocator_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
ActivityPtr writer_;

@ -112,17 +112,15 @@ class InprocClientTransport final : public Transport, public ClientTransport {
void StartCall(CallHandler call_handler) override {
call_handler.SpawnGuarded(
"pull_initial_metadata",
TrySeq(
call_handler.PullClientInitialMetadata(),
[server_transport = server_transport_,
call_handler](ClientMetadataHandle md) {
auto call_initiator = server_transport->AcceptCall(*md);
if (!call_initiator.ok()) return call_initiator.status();
ForwardCall(call_handler, std::move(*call_initiator),
std::move(md));
return absl::OkStatus();
},
ImmediateOkStatus()));
TrySeq(call_handler.PullClientInitialMetadata(),
[server_transport = server_transport_,
call_handler](ClientMetadataHandle md) {
auto call_initiator = server_transport->AcceptCall(*md);
if (!call_initiator.ok()) return call_initiator.status();
ForwardCall(call_handler, std::move(*call_initiator),
std::move(md));
return absl::OkStatus();
}));
}
void Orphan() override { delete this; }
@ -172,8 +170,9 @@ RefCountedPtr<Channel> MakeLameChannel(absl::string_view why,
RefCountedPtr<Channel> MakeInprocChannel(Server* server,
ChannelArgs client_channel_args) {
auto client_transport = MakeOrphanable<InprocClientTransport>();
auto server_transport = client_transport->GetServerTransport();
auto transports = MakeInProcessTransportPair();
auto client_transport = std::move(transports.first);
auto server_transport = std::move(transports.second);
auto error =
server->SetupTransport(server_transport.get(), nullptr,
server->channel_args()
@ -195,6 +194,14 @@ RefCountedPtr<Channel> MakeInprocChannel(Server* server,
}
} // namespace
std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>>
MakeInProcessTransportPair() {
auto client_transport = MakeOrphanable<InprocClientTransport>();
auto server_transport = client_transport->GetServerTransport();
return std::make_pair(std::move(client_transport),
std::move(server_transport));
}
} // namespace grpc_core
grpc_channel* grpc_inproc_channel_create(grpc_server* server,

@ -20,6 +20,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/transport.h"
grpc_channel* grpc_inproc_channel_create(grpc_server* server,
const grpc_channel_args* args,
@ -27,4 +28,11 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server,
extern grpc_core::TraceFlag grpc_inproc_trace;
namespace grpc_core {
std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>>
MakeInProcessTransportPair();
}
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_INPROC_INPROC_TRANSPORT_H

@ -65,6 +65,7 @@ class DebugLocation {
DebugLocation(const char* file = GRPC_DEFAULT_FILE,
int line = GRPC_DEFAULT_LINE)
: location_(file, line) {}
explicit DebugLocation(SourceLocation location) : location_(location) {}
const char* file() const { return location_.file(); }
int line() const { return location_.line(); }
@ -75,6 +76,7 @@ class DebugLocation {
class DebugLocation {
public:
DebugLocation() {}
explicit DebugLocation(SourceLocation) {}
DebugLocation(const char* /* file */, int /* line */) {}
const char* file() const { return nullptr; }
int line() const { return -1; }

@ -47,17 +47,14 @@ struct Done;
template <>
struct Done<absl::Status> {
static absl::Status Make() { return absl::OkStatus(); }
static absl::Status Make(bool cancelled) {
return cancelled ? absl::CancelledError() : absl::OkStatus();
}
};
template <>
struct Done<StatusFlag> {
static StatusFlag Make() { return StatusFlag(true); }
};
template <>
struct Done<Success> {
static Success Make() { return Success{}; }
static StatusFlag Make(bool cancelled) { return StatusFlag(!cancelled); }
};
template <typename Reader, typename Action>
@ -139,7 +136,7 @@ class ForEach {
reading_next_ = false;
return PollAction();
} else {
return Done<Result>::Make();
return Done<Result>::Make(p->cancelled());
}
}
return Pending();

@ -35,6 +35,24 @@ namespace grpc_core {
template <typename T, uint8_t kQueueSize>
class InterActivityPipe {
public:
class NextResult {
public:
template <typename... Args>
explicit NextResult(Args&&... args) : value_(std::forward<Args>(args)...) {}
using value_type = T;
void reset() { value_.reset(); }
bool cancelled() const { return false; }
bool has_value() const { return value_.has_value(); }
const T& value() const { return value_.value(); }
T& value() { return value_.value(); }
const T& operator*() const { return *value_; }
T& operator*() { return *value_; }
private:
absl::optional<T> value_;
};
private:
class Center : public RefCounted<Center, NonPolymorphicRefCount> {
public:
@ -55,7 +73,7 @@ class InterActivityPipe {
return true;
}
Poll<absl::optional<T>> Next() {
Poll<NextResult> Next() {
ReleasableMutexLock lock(&mu_);
if (count_ == 0) {
if (closed_) return absl::nullopt;

@ -74,7 +74,7 @@ class Observable {
GRPC_MUST_USE_RESULT Waker Add(Observer* observer)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
observers_.insert(observer);
return Activity::current()->MakeNonOwningWaker();
return GetContext<Activity>()->MakeNonOwningWaker();
}
private:

@ -77,6 +77,7 @@ class StatusFlag {
bool ok() const { return value_; }
bool operator==(StatusFlag other) const { return value_ == other.value_; }
std::string ToString() const { return value_ ? "ok" : "failed"; }
template <typename Sink>
friend void AbslStringify(Sink& sink, StatusFlag flag) {

@ -59,9 +59,6 @@ PromiseEndpoint::GetLocalAddress() const {
void PromiseEndpoint::ReadState::Complete(absl::Status status,
size_t num_bytes_requested) {
gpr_log(GPR_ERROR, "PromiseEndpoint::ReadState::Complete: status:%s",
status.ToString().c_str());
if (!status.ok()) {
// Invalidates all previous reads.
pending_buffer.Clear();

@ -229,9 +229,9 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
std::move(client_initial_metadata));
});
// Read messages from handler into initiator.
call_handler.SpawnGuarded(
"read_messages", [call_handler, call_initiator]() mutable {
return ForEach(OutgoingMessages(call_handler),
call_handler.SpawnGuarded("read_messages", [call_handler,
call_initiator]() mutable {
return Seq(ForEach(OutgoingMessages(call_handler),
[call_initiator](MessageHandle msg) mutable {
// Need to spawn a job into the initiator's activity to
// push the message in.
@ -241,32 +241,47 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
return call_initiator.CancelIfFails(
call_initiator.PushMessage(std::move(msg)));
});
});
});
}),
[call_initiator](StatusFlag result) mutable {
call_initiator.SpawnInfallible(
"finish-downstream", [call_initiator, result]() mutable {
if (result.ok()) {
call_initiator.FinishSends();
} else {
call_initiator.Cancel();
}
return Empty{};
});
return result;
});
});
call_initiator.SpawnInfallible("read_the_things", [call_initiator,
call_handler]() mutable {
return Seq(
call_initiator.CancelIfFails(TrySeq(
call_initiator.PullServerInitialMetadata(),
[call_handler](ServerMetadataHandle md) mutable {
[call_handler,
call_initiator](absl::optional<ServerMetadataHandle> md) mutable {
const bool has_md = md.has_value();
call_handler.SpawnGuarded(
"recv_initial_metadata",
[md = std::move(md), call_handler]() mutable {
return call_handler.PushServerInitialMetadata(
std::move(md));
});
return Success{};
},
ForEach(OutgoingMessages(call_initiator),
[call_handler](MessageHandle msg) mutable {
return call_handler.SpawnWaitable(
"recv_message",
[msg = std::move(msg), call_handler]() mutable {
return call_handler.CancelIfFails(
call_handler.PushMessage(std::move(msg)));
});
}),
ImmediateOkStatus())),
return If(
has_md,
ForEach(OutgoingMessages(call_initiator),
[call_handler](MessageHandle msg) mutable {
return call_handler.SpawnWaitable(
"recv_message",
[msg = std::move(msg), call_handler]() mutable {
return call_handler.CancelIfFails(
call_handler.PushMessage(std::move(msg)));
});
}),
[]() -> StatusFlag { return Success{}; });
})),
call_initiator.PullServerTrailingMetadata(),
[call_handler](ServerMetadataHandle md) mutable {
call_handler.SpawnGuarded(

@ -54,10 +54,11 @@
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/party.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/promise/prioritized_race.h"
#include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
@ -257,6 +258,10 @@ class CallSpineInterface {
"SpawnGuarded promise must return a status-like object");
party().Spawn(name, std::move(promise_factory), [this](ResultType r) {
if (!IsStatusOk(r)) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "SpawnGuarded sees failure: %s",
r.ToString().c_str());
}
std::ignore = Cancel(StatusCast<ServerMetadataHandle>(std::move(r)));
}
});
@ -357,22 +362,26 @@ class CallInitiator {
auto PullServerInitialMetadata() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->server_initial_metadata().receiver.Next(),
[](NextResult<ClientMetadataHandle> md)
-> ValueOrFailure<ClientMetadataHandle> {
if (!md.has_value()) return Failure{};
return std::move(*md);
[](NextResult<ServerMetadataHandle> md)
-> ValueOrFailure<absl::optional<ServerMetadataHandle>> {
if (!md.has_value()) {
if (md.cancelled()) return Failure{};
return absl::optional<ServerMetadataHandle>();
}
return absl::optional<ServerMetadataHandle>(std::move(*md));
});
}
auto PullServerTrailingMetadata() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Race(spine_->WaitForCancel(),
Map(spine_->server_trailing_metadata().receiver.Next(),
[spine = spine_](NextResult<ServerMetadataHandle> md)
-> ServerMetadataHandle {
GPR_ASSERT(md.has_value());
return std::move(*md);
}));
return PrioritizedRace(
Map(spine_->server_trailing_metadata().receiver.Next(),
[spine = spine_](
NextResult<ServerMetadataHandle> md) -> ServerMetadataHandle {
GPR_ASSERT(md.has_value());
return std::move(*md);
}),
spine_->WaitForCancel());
}
auto PullMessage() {
@ -439,15 +448,26 @@ class CallHandler {
});
}
auto PushServerInitialMetadata(ServerMetadataHandle md) {
auto PushServerInitialMetadata(absl::optional<ServerMetadataHandle> md) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->server_initial_metadata().sender.Push(std::move(md)),
[](bool ok) { return StatusFlag(ok); });
return If(
md.has_value(),
[&md, this]() {
return Map(
spine_->server_initial_metadata().sender.Push(std::move(*md)),
[](bool ok) { return StatusFlag(ok); });
},
[this]() {
spine_->server_initial_metadata().sender.Close();
return []() -> StatusFlag { return Success{}; };
});
}
auto PushServerTrailingMetadata(ServerMetadataHandle md) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
spine_->server_initial_metadata().sender.Close();
spine_->server_to_client_messages().sender.Close();
spine_->client_to_server_messages().receiver.CloseWithError();
spine_->CallOnDone();
return Map(spine_->server_trailing_metadata().sender.Push(std::move(md)),
[](bool ok) { return StatusFlag(ok); });

@ -38,12 +38,12 @@ TEST(InterActivityPipe, CanSendAndReceive) {
return absl::OkStatus();
}));
EXPECT_FALSE(done);
auto b =
TestActivity(Seq(pipe.receiver.Next(), [&done](absl::optional<int> n) {
EXPECT_EQ(n, 3);
done = true;
return absl::OkStatus();
}));
auto b = TestActivity(Seq(pipe.receiver.Next(),
[&done](InterActivityPipe<int, 1>::NextResult n) {
EXPECT_EQ(n.value(), 3);
done = true;
return absl::OkStatus();
}));
EXPECT_TRUE(done);
}
@ -63,12 +63,12 @@ TEST(InterActivityPipe, CanSendTwiceAndReceive) {
EXPECT_FALSE(done);
auto b = TestActivity(Seq(
pipe.receiver.Next(),
[&pipe](absl::optional<int> n) {
EXPECT_EQ(n, 3);
[&pipe](InterActivityPipe<int, 1>::NextResult n) {
EXPECT_EQ(n.value(), 3);
return pipe.receiver.Next();
},
[&done](absl::optional<int> n) {
EXPECT_EQ(n, 4);
[&done](InterActivityPipe<int, 1>::NextResult n) {
EXPECT_EQ(n.value(), 4);
done = true;
return absl::OkStatus();
}));
@ -78,12 +78,12 @@ TEST(InterActivityPipe, CanSendTwiceAndReceive) {
TEST(InterActivityPipe, CanReceiveAndSend) {
InterActivityPipe<int, 1> pipe;
bool done = false;
auto b =
TestActivity(Seq(pipe.receiver.Next(), [&done](absl::optional<int> n) {
EXPECT_EQ(n, 3);
done = true;
return absl::OkStatus();
}));
auto b = TestActivity(Seq(pipe.receiver.Next(),
[&done](InterActivityPipe<int, 1>::NextResult n) {
EXPECT_EQ(n.value(), 3);
done = true;
return absl::OkStatus();
}));
EXPECT_FALSE(done);
auto a = TestActivity(Seq(pipe.sender.Push(3), [](bool b) {
EXPECT_TRUE(b);
@ -95,12 +95,12 @@ TEST(InterActivityPipe, CanReceiveAndSend) {
TEST(InterActivityPipe, CanClose) {
InterActivityPipe<int, 1> pipe;
bool done = false;
auto b =
TestActivity(Seq(pipe.receiver.Next(), [&done](absl::optional<int> n) {
EXPECT_EQ(n, absl::nullopt);
done = true;
return absl::OkStatus();
}));
auto b = TestActivity(Seq(pipe.receiver.Next(),
[&done](InterActivityPipe<int, 1>::NextResult n) {
EXPECT_FALSE(n.has_value());
done = true;
return absl::OkStatus();
}));
EXPECT_FALSE(done);
// Drop the sender
{ auto x = std::move(pipe.sender); }

@ -186,7 +186,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
"test-read", [&on_done, initiator = call.initiator]() mutable {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<ServerMetadataHandle> md) {
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
return Empty{};
},
@ -231,7 +231,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) {
"test-read", [&on_done, initiator = call.initiator]() mutable {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<ServerMetadataHandle> md) {
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
return Empty{};
},
@ -294,7 +294,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
"test-read-1", [&on_done1, initiator = call1.initiator]() mutable {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<ServerMetadataHandle> md) {
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
return Empty{};
},
@ -310,7 +310,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
"test-read-2", [&on_done2, initiator = call2.initiator]() mutable {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<ServerMetadataHandle> md) {
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
return Empty{};
},
@ -365,7 +365,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
"test-read", [&on_done1, initiator = call1.initiator]() mutable {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<ServerMetadataHandle> md) {
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
return Empty{};
},
@ -381,7 +381,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
"test-read", [&on_done2, initiator = call2.initiator]() mutable {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<ServerMetadataHandle> md) {
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
return Empty{};
},

@ -135,11 +135,14 @@ TEST_F(TransportTest, AddOneStream) {
"test-read", [&on_done, initiator = call.initiator]() mutable {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<ServerMetadataHandle> md) {
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(
md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/demo.Service/Step");
EXPECT_TRUE(md.value().has_value());
EXPECT_EQ(md.value()
.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return Empty{};
},
initiator.PullMessage(),
@ -219,11 +222,14 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
"test-read", [&on_done, initiator = call.initiator]() mutable {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<ServerMetadataHandle> md) {
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(
md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/demo.Service/Step");
EXPECT_TRUE(md.value().has_value());
EXPECT_EQ(md.value()
.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return Empty{};
},
initiator.PullMessage(),

@ -0,0 +1,165 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load(
"//bazel:grpc_build_system.bzl",
"grpc_cc_library",
"grpc_package",
"grpc_proto_library",
)
load("grpc_transport_test.bzl", "grpc_transport_test")
grpc_package(name = "test/core/transport/test_suite")
grpc_cc_library(
name = "fixture",
testonly = 1,
srcs = ["fixture.cc"],
hdrs = ["fixture.h"],
deps = [
"//:grpc",
"//test/core/event_engine/fuzzing_event_engine",
],
)
grpc_cc_library(
name = "inproc_fixture",
testonly = 1,
srcs = ["inproc_fixture.cc"],
deps = [
"fixture",
"//src/core:grpc_transport_inproc",
],
alwayslink = 1,
)
grpc_cc_library(
name = "chaotic_good_fixture",
testonly = 1,
srcs = ["chaotic_good_fixture.cc"],
external_deps = ["gtest"],
deps = [
"fixture",
"//src/core:chaotic_good_client_transport",
"//src/core:chaotic_good_server_transport",
"//src/core:event_engine_memory_allocator_factory",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:grpc_promise_endpoint",
"//src/core:resource_quota",
],
alwayslink = 1,
)
grpc_cc_library(
name = "test",
testonly = 1,
srcs = ["test.cc"],
hdrs = ["test.h"],
external_deps = [
"absl/functional:any_invocable",
"absl/random",
"absl/random:bit_gen_ref",
"absl/strings",
"gtest",
],
deps = [
"fixture",
"//:iomgr_timer",
"//:promise",
"//src/core:cancel_callback",
"//src/core:resource_quota",
"//src/core:time",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
],
)
grpc_cc_library(
name = "test_main",
testonly = 1,
srcs = ["test_main.cc"],
external_deps = ["absl/random"],
deps = [
"fixture",
"test",
"//:grpc_trace",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_library(
name = "call_content",
testonly = 1,
srcs = ["call_content.cc"],
external_deps = ["gtest"],
deps = ["test"],
alwayslink = 1,
)
grpc_cc_library(
name = "call_shapes",
testonly = 1,
srcs = ["call_shapes.cc"],
deps = ["test"],
alwayslink = 1,
)
grpc_cc_library(
name = "no_op",
testonly = 1,
srcs = ["no_op.cc"],
deps = ["test"],
alwayslink = 1,
)
grpc_cc_library(
name = "stress",
testonly = 1,
srcs = ["stress.cc"],
external_deps = ["absl/random"],
deps = ["test"],
alwayslink = 1,
)
grpc_proto_library(
name = "fuzzer_proto",
srcs = ["fuzzer.proto"],
has_services = False,
deps = [
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
"//test/core/util:fuzz_config_vars_proto",
],
)
grpc_transport_test(
name = "inproc",
deps = [
":call_content",
":call_shapes",
":inproc_fixture",
":no_op",
":stress",
],
)
grpc_transport_test(
name = "chaotic_good",
deps = [
":call_content",
":call_shapes",
":chaotic_good_fixture",
":no_op",
":stress",
],
)

@ -0,0 +1,151 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "gmock/gmock.h"
#include "test/core/transport/test_suite/test.h"
using testing::UnorderedElementsAreArray;
namespace grpc_core {
namespace {
class LoweringEncoder {
public:
std::vector<std::pair<std::string, std::string>> Take() {
return std::move(metadata_);
}
void Encode(const Slice& key, const Slice& value) {
metadata_.emplace_back(key.as_string_view(), value.as_string_view());
}
template <typename Which>
void Encode(Which, const typename Which::ValueType& value) {
metadata_.emplace_back(Which::key(), Which::Encode(value).as_string_view());
}
private:
std::vector<std::pair<std::string, std::string>> metadata_;
};
std::vector<std::pair<std::string, std::string>> LowerMetadata(
const grpc_metadata_batch& metadata) {
LoweringEncoder encoder;
metadata.Encode(&encoder);
return encoder.Take();
}
void FillMetadata(const std::vector<std::pair<std::string, std::string>>& md,
grpc_metadata_batch& out) {
for (const auto& p : md) {
out.Append(p.first, Slice::FromCopiedString(p.second),
[&](absl::string_view error, const Slice& value) {
Crash(absl::StrCat(
"Failed to parse metadata for '", p.first, "': ", error,
" value=", absl::CEscape(value.as_string_view())));
});
}
}
} // namespace
TRANSPORT_TEST(UnaryWithSomeContent) {
SetServerAcceptor();
auto initiator = CreateCall();
const auto client_initial_metadata = RandomMetadata();
const auto server_initial_metadata = RandomMetadata();
const auto server_trailing_metadata = RandomMetadata();
const auto client_payload = RandomMessage();
const auto server_payload = RandomMessage();
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>(GetContext<Arena>());
FillMetadata(client_initial_metadata, *md);
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(client_payload)), 0));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
initiator.FinishSends();
return initiator.PullServerInitialMetadata();
},
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
EXPECT_THAT(LowerMetadata(***md),
UnorderedElementsAreArray(server_initial_metadata));
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), server_payload);
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_THAT(LowerMetadata(**md),
UnorderedElementsAreArray(server_trailing_metadata));
return Empty{};
});
auto handler = TickUntilServerCall();
SpawnTestSeq(
handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_THAT(LowerMetadata(**md),
UnorderedElementsAreArray(client_initial_metadata));
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), client_payload);
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
FillMetadata(server_initial_metadata, *md);
return handler.PushServerInitialMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(server_payload)), 0));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
FillMetadata(server_trailing_metadata, *md);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return Empty{};
});
WaitForAllPendingWork();
}
} // namespace grpc_core

@ -0,0 +1,681 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/transport/test_suite/test.h"
namespace grpc_core {
TRANSPORT_TEST(MetadataOnlyRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>(GetContext<Arena>());
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
initiator.FinishSends();
return initiator.PullServerInitialMetadata();
},
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
GRPC_STATUS_UNIMPLEMENTED);
return Empty{};
});
auto handler = TickUntilServerCall();
SpawnTestSeq(
handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/foo/bar");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return Empty{};
});
WaitForAllPendingWork();
}
TRANSPORT_TEST(MetadataOnlyRequestServerAbortsAfterInitialMetadata) {
SetServerAcceptor();
auto initiator = CreateCall();
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>(GetContext<Arena>());
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
// We don't close the sending stream here.
return initiator.PullServerInitialMetadata();
},
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
GRPC_STATUS_UNIMPLEMENTED);
return Empty{};
});
auto handler = TickUntilServerCall();
SpawnTestSeq(
handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
[&](ValueOrFailure<ServerMetadataHandle> got_md) {
EXPECT_TRUE(got_md.ok());
EXPECT_EQ(
got_md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/foo/bar");
// Don't wait for end of stream for client->server messages, just
// publish initial then trailing metadata.
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return Empty{};
});
WaitForAllPendingWork();
}
TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) {
SetServerAcceptor();
auto initiator = CreateCall();
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>(GetContext<Arena>());
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
// We don't close the sending stream here.
return initiator.PullServerInitialMetadata();
},
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_FALSE(md.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
GRPC_STATUS_UNIMPLEMENTED);
return Empty{};
});
auto handler = TickUntilServerCall();
SpawnTestSeq(
handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
[&](ValueOrFailure<ServerMetadataHandle> got_md) {
EXPECT_TRUE(got_md.ok());
EXPECT_EQ(
got_md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/foo/bar");
// Don't wait for end of stream for client->server messages, just
// and don't send initial metadata - just trailing metadata.
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return Empty{};
});
WaitForAllPendingWork();
}
TRANSPORT_TEST(CanCreateCallThenAbandonIt) {
SetServerAcceptor();
auto initiator = CreateCall();
SpawnTestSeq(
initiator, "start-call",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>(GetContext<Arena>());
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return Empty{};
});
auto handler = TickUntilServerCall();
SpawnTestSeq(initiator, "end-call", [&]() {
initiator.Cancel();
return Empty{};
});
WaitForAllPendingWork();
}
TRANSPORT_TEST(UnaryRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>(GetContext<Arena>());
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world")), 0));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
initiator.FinishSends();
return initiator.PullServerInitialMetadata();
},
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
GRPC_STATUS_UNIMPLEMENTED);
return Empty{};
});
auto handler = TickUntilServerCall();
SpawnTestSeq(
handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/foo/bar");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return Empty{};
});
WaitForAllPendingWork();
}
TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
SetServerAcceptor();
auto initiator = CreateCall();
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>(GetContext<Arena>());
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world")), 0));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
initiator.FinishSends();
return initiator.PullServerInitialMetadata();
},
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
GRPC_STATUS_UNIMPLEMENTED);
return Empty{};
});
auto handler = TickUntilServerCall();
SpawnTestSeq(
handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/foo/bar");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return Empty{};
});
WaitForAllPendingWork();
}
TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
SetServerAcceptor();
auto initiator = CreateCall();
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>(GetContext<Arena>());
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PullServerInitialMetadata();
},
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
ContentTypeMetadata::kApplicationGrpc);
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world")), 0));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
initiator.FinishSends();
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
GRPC_STATUS_UNIMPLEMENTED);
return Empty{};
});
auto handler = TickUntilServerCall();
SpawnTestSeq(
handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/foo/bar");
auto md_out = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md_out->Set(ContentTypeMetadata(),
ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md_out));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return Empty{};
});
WaitForAllPendingWork();
}
TRANSPORT_TEST(ClientStreamingRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>(GetContext<Arena>());
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PullServerInitialMetadata();
},
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
ContentTypeMetadata::kApplicationGrpc);
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world")), 0));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world (2)")), 0));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world (3)")), 0));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world (4)")), 0));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world (5)")), 0));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
initiator.FinishSends();
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
GRPC_STATUS_UNIMPLEMENTED);
return Empty{};
});
auto handler = TickUntilServerCall();
SpawnTestSeq(
handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/foo/bar");
auto md_out = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md_out->Set(ContentTypeMetadata(),
ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md_out));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (2)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (3)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (4)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (5)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return Empty{};
});
WaitForAllPendingWork();
}
TRANSPORT_TEST(ServerStreamingRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>(GetContext<Arena>());
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PullServerInitialMetadata();
},
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
ContentTypeMetadata::kApplicationGrpc);
initiator.FinishSends();
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
"why hello neighbor (2)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
"why hello neighbor (3)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
"why hello neighbor (4)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
"why hello neighbor (5)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
"why hello neighbor (6)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
GRPC_STATUS_UNIMPLEMENTED);
return Empty{};
});
auto handler = TickUntilServerCall();
SpawnTestSeq(
handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
[&](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/foo/bar");
auto md_out = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md_out->Set(ContentTypeMetadata(),
ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md_out));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor (2)")), 0));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor (3)")), 0));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor (4)")), 0));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor (5)")), 0));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor (6)")), 0));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return Empty{};
});
WaitForAllPendingWork();
}
} // namespace grpc_core

@ -0,0 +1,117 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include "gmock/gmock.h"
#include "src/core/ext/transport/chaotic_good/client_transport.h"
#include "src/core/ext/transport/chaotic_good/server_transport.h"
#include "src/core/lib/event_engine/memory_allocator_factory.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/transport/promise_endpoint.h"
#include "test/core/transport/test_suite/fixture.h"
using grpc_event_engine::experimental::EndpointConfig;
using grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::FuzzingEventEngine;
using grpc_event_engine::experimental::MemoryQuotaBasedMemoryAllocatorFactory;
using grpc_event_engine::experimental::URIToResolvedAddress;
namespace grpc_core {
namespace {
class MockEndpointConfig : public EndpointConfig {
public:
MOCK_METHOD(absl::optional<int>, GetInt, (absl::string_view key),
(const, override));
MOCK_METHOD(absl::optional<absl::string_view>, GetString,
(absl::string_view key), (const, override));
MOCK_METHOD(void*, GetVoidPointer, (absl::string_view key),
(const, override));
};
struct EndpointPair {
std::unique_ptr<PromiseEndpoint> client;
std::unique_ptr<PromiseEndpoint> server;
};
EndpointPair CreateEndpointPair(
grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
ResourceQuotaRefPtr resource_quota, int port) {
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
const auto resolved_address =
URIToResolvedAddress(absl::StrCat("ipv4:127.0.0.1:", port)).value();
::testing::StrictMock<MockEndpointConfig> endpoint_config;
auto listener = *event_engine->CreateListener(
[&server_endpoint](std::unique_ptr<EventEngine::Endpoint> endpoint,
MemoryAllocator) {
server_endpoint = std::move(endpoint);
},
[](absl::Status) {}, endpoint_config,
std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
resource_quota->memory_quota()));
GPR_ASSERT(listener->Bind(resolved_address).ok());
GPR_ASSERT(listener->Start().ok());
event_engine->Connect(
[&client_endpoint](
absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> endpoint) {
GPR_ASSERT(endpoint.ok());
client_endpoint = std::move(endpoint).value();
},
resolved_address, endpoint_config,
resource_quota->memory_quota()->CreateMemoryAllocator("client"),
Duration::Hours(3));
while (client_endpoint == nullptr || server_endpoint == nullptr) {
event_engine->Tick();
}
return EndpointPair{std::make_unique<PromiseEndpoint>(
std::move(client_endpoint), SliceBuffer()),
std::make_unique<PromiseEndpoint>(
std::move(server_endpoint), SliceBuffer())};
}
} // namespace
TRANSPORT_FIXTURE(ChaoticGood) {
auto resource_quota = MakeResourceQuota("test");
EndpointPair control_endpoints =
CreateEndpointPair(event_engine.get(), resource_quota, 1234);
EndpointPair data_endpoints =
CreateEndpointPair(event_engine.get(), resource_quota, 4321);
auto channel_args =
ChannelArgs()
.SetObject(resource_quota)
.SetObject(std::static_pointer_cast<EventEngine>(event_engine));
auto client_transport =
MakeOrphanable<chaotic_good::ChaoticGoodClientTransport>(
std::move(control_endpoints.client), std::move(data_endpoints.client),
event_engine);
auto server_transport =
MakeOrphanable<chaotic_good::ChaoticGoodServerTransport>(
channel_args, std::move(control_endpoints.server),
std::move(data_endpoints.server), event_engine);
return ClientAndServerTransportPair{std::move(client_transport),
std::move(server_transport)};
}
} // namespace grpc_core

@ -0,0 +1,29 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/transport/test_suite/fixture.h"
namespace grpc_core {
TransportFixtureRegistry& TransportFixtureRegistry::Get() {
static TransportFixtureRegistry* registry = new TransportFixtureRegistry();
return *registry;
}
void TransportFixtureRegistry::RegisterFixture(
absl::string_view name,
absl::AnyInvocable<TransportFixture*() const> create) {
fixtures_.push_back({name, std::move(create)});
}
} // namespace grpc_core

@ -0,0 +1,77 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_FIXTURE_H
#define GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_FIXTURE_H
#include "src/core/lib/transport/transport.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
namespace grpc_core {
class TransportFixture {
public:
struct ClientAndServerTransportPair {
OrphanablePtr<Transport> client;
OrphanablePtr<Transport> server;
};
virtual ~TransportFixture() = default;
virtual ClientAndServerTransportPair CreateTransportPair(
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine) = 0;
};
class TransportFixtureRegistry {
public:
static TransportFixtureRegistry& Get();
void RegisterFixture(absl::string_view name,
absl::AnyInvocable<TransportFixture*() const> create);
struct Fixture {
absl::string_view name;
absl::AnyInvocable<TransportFixture*() const> create;
};
const std::vector<Fixture>& fixtures() const { return fixtures_; }
private:
std::vector<Fixture> fixtures_;
};
} // namespace grpc_core
#define TRANSPORT_FIXTURE(name) \
class TransportFixture_##name : public grpc_core::TransportFixture { \
public: \
using TransportFixture::TransportFixture; \
ClientAndServerTransportPair CreateTransportPair( \
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> \
event_engine) override; \
\
private: \
static grpc_core::TransportFixture* Create() { \
return new TransportFixture_##name(); \
} \
static int registered_; \
}; \
int TransportFixture_##name::registered_ = \
(grpc_core::TransportFixtureRegistry::Get().RegisterFixture( \
#name, &TransportFixture_##name::Create), \
0); \
grpc_core::TransportFixture::ClientAndServerTransportPair \
TransportFixture_##name::CreateTransportPair( \
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> \
event_engine GRPC_UNUSED)
#endif // GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_FIXTURE_H

@ -0,0 +1,29 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package transport_test_suite;
import "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto";
import "test/core/util/fuzz_config_vars.proto";
message Msg {
uint32 test_id = 1;
uint32 fixture_id = 2;
fuzzing_event_engine.Actions event_engine_actions = 10;
grpc.testing.FuzzConfigVars config_vars = 11;
repeated uint64 rng = 12;
}

@ -0,0 +1,72 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdio.h>
#include <gtest/gtest.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/experiments/config.h"
#include "src/core/lib/gprpp/env.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/transport/test_suite/fixture.h"
#include "test/core/transport/test_suite/fuzzer.pb.h"
#include "test/core/transport/test_suite/test.h"
#include "test/core/util/fuzz_config_vars.h"
#include "test/core/util/proto_bit_gen.h"
namespace grpc_event_engine {
namespace experimental {
extern bool g_event_engine_supports_fd;
}
} // namespace grpc_event_engine
bool squelch = true;
static void dont_log(gpr_log_func_args* /*args*/) {}
DEFINE_PROTO_FUZZER(const transport_test_suite::Msg& msg) {
const auto& tests = grpc_core::TransportTestRegistry::Get().tests();
const auto& fixtures = grpc_core::TransportFixtureRegistry::Get().fixtures();
GPR_ASSERT(!tests.empty());
GPR_ASSERT(!fixtures.empty());
const int test_id = msg.test_id() % tests.size();
const int fixture_id = msg.fixture_id() % fixtures.size();
if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) {
gpr_set_log_function(dont_log);
}
grpc_core::ConfigVars::Overrides overrides =
grpc_core::OverridesFromFuzzConfigVars(msg.config_vars());
grpc_core::ConfigVars::SetOverrides(overrides);
grpc_core::TestOnlyReloadExperimentsFromConfigVariables();
if (!squelch) {
fprintf(stderr, "RUN TEST '%s' with fixture '%s'\n",
std::string(tests[test_id].name).c_str(),
std::string(fixtures[fixture_id].name).c_str());
}
grpc_core::ProtoBitGen bitgen(msg.rng());
auto test =
tests[test_id].create(std::unique_ptr<grpc_core::TransportFixture>(
fixtures[fixture_id].create()),
msg.event_engine_actions(), bitgen);
test->RunTest();
delete test;
GPR_ASSERT(!::testing::Test::HasFailure());
}

@ -0,0 +1,58 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Generate one transport test & associated fuzzer
"""
load("//bazel:grpc_build_system.bzl", "grpc_cc_test")
load("//test/core/util:grpc_fuzzer.bzl", "grpc_proto_fuzzer")
def grpc_transport_test(name, deps):
grpc_cc_test(
name = name + "_test",
srcs = [],
tags = ["no_windows", "no_mac"],
deps = [
":test_main",
] + deps,
)
grpc_proto_fuzzer(
name = name + "_fuzzer",
srcs = ["fuzzer_main.cc"],
tags = ["no_windows", "no_mac"],
external_deps = [
"gtest",
],
deps = [
":test",
":fixture",
":fuzzer_proto",
"//:event_engine_base_hdrs",
"//:config_vars",
"//:exec_ctx",
"//:gpr",
"//:grpc_unsecure",
"//:iomgr_timer",
"//src/core:default_event_engine",
"//src/core:env",
"//src/core:experiments",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/util:fuzz_config_vars",
"//test/core/util:proto_bit_gen",
] + deps,
corpus = "corpus/%s" % name,
proto = None,
)

@ -0,0 +1,25 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/ext/transport/inproc/inproc_transport.h"
#include "test/core/transport/test_suite/fixture.h"
namespace grpc_core {
TRANSPORT_FIXTURE(Inproc) {
auto transports = MakeInProcessTransportPair();
return {std::move(transports.first), std::move(transports.second)};
}
} // namespace grpc_core

@ -0,0 +1,28 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/transport/test_suite/test.h"
namespace grpc_core {
TRANSPORT_TEST(NoOp) {}
TRANSPORT_TEST(WaitForAllPendingWork) { WaitForAllPendingWork(); }
TRANSPORT_TEST(SetServerAcceptorAndFinish) {
SetServerAcceptor();
WaitForAllPendingWork();
}
} // namespace grpc_core

@ -0,0 +1,131 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/random/random.h"
#include "test/core/transport/test_suite/test.h"
namespace grpc_core {
TRANSPORT_TEST(ManyUnaryRequests) {
SetServerAcceptor();
const int kNumRequests = absl::LogUniform<int>(rng(), 10, 100);
std::list<std::string> call_names;
auto make_call_name = [&call_names](int i,
absl::string_view suffix) -> const char* {
call_names.emplace_back(absl::StrCat("call-", i, "-", suffix));
return call_names.back().c_str();
};
std::map<int, std::string> client_messages;
std::map<int, std::string> server_messages;
for (int i = 0; i < kNumRequests; i++) {
auto initiator = CreateCall();
client_messages[i] = RandomMessage();
server_messages[i] = RandomMessage();
SpawnTestSeq(
initiator, make_call_name(i, "initiator"),
[initiator, i]() mutable {
auto md = Arena::MakePooled<ClientMetadata>(GetContext<Arena>());
md->Set(HttpPathMetadata(),
Slice::FromCopiedString(std::to_string(i)));
return initiator.PushClientInitialMetadata(std::move(md));
},
[initiator, i, &client_messages](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(client_messages[i])), 0));
},
[initiator](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
initiator.FinishSends();
return initiator.PullServerInitialMetadata();
},
[initiator](
ValueOrFailure<absl::optional<ServerMetadataHandle>> md) mutable {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullMessage();
},
[initiator, i,
&server_messages](NextResult<MessageHandle> msg) mutable {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
server_messages[i]);
return initiator.PullMessage();
},
[initiator](NextResult<MessageHandle> msg) mutable {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
return initiator.PullServerTrailingMetadata();
},
[initiator](ValueOrFailure<ServerMetadataHandle> md) mutable {
EXPECT_TRUE(md.ok());
EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
GRPC_STATUS_UNIMPLEMENTED);
return Empty{};
});
}
for (int i = 0; i < kNumRequests; i++) {
auto handler = TickUntilServerCall();
auto this_call_index = std::make_shared<int>(-1);
SpawnTestSeq(
handler, make_call_name(i, "handler"),
[handler]() mutable { return handler.PullClientInitialMetadata(); },
[handler,
this_call_index](ValueOrFailure<ServerMetadataHandle> md) mutable {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(absl::SimpleAtoi(
md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
&*this_call_index));
return handler.PullMessage();
},
[handler, this_call_index,
&client_messages](NextResult<MessageHandle> msg) mutable {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
client_messages[*this_call_index]);
return handler.PullMessage();
},
[handler](NextResult<MessageHandle> msg) mutable {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
},
[handler, this_call_index,
&server_messages](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(
Slice::FromCopiedString(server_messages[*this_call_index])),
0));
},
[handler](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>(GetContext<Arena>());
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[handler](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return Empty{};
});
}
WaitForAllPendingWork();
}
} // namespace grpc_core

@ -0,0 +1,271 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/transport/test_suite/test.h"
#include <initializer_list>
#include "absl/random/random.h"
namespace grpc_core {
///////////////////////////////////////////////////////////////////////////////
// TransportTestRegistry
TransportTestRegistry& TransportTestRegistry::Get() {
static TransportTestRegistry* registry = new TransportTestRegistry();
return *registry;
}
void TransportTestRegistry::RegisterTest(
absl::string_view name,
absl::AnyInvocable<TransportTest*(std::unique_ptr<TransportFixture>,
const fuzzing_event_engine::Actions&,
absl::BitGenRef) const>
create) {
if (absl::StartsWith(name, "DISABLED_")) return;
tests_.push_back({name, std::move(create)});
}
///////////////////////////////////////////////////////////////////////////////
// TransportTest
void TransportTest::RunTest() {
TestImpl();
EXPECT_EQ(pending_actions_.size(), 0)
<< "There are still pending actions: did you forget to call "
"WaitForAllPendingWork()?";
transport_pair_.client.reset();
transport_pair_.server.reset();
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
void TransportTest::SetServerAcceptor() {
transport_pair_.server->server_transport()->SetAcceptor(&acceptor_);
}
CallInitiator TransportTest::CreateCall() {
auto call = MakeCall(event_engine_.get(), Arena::Create(1024, &allocator_));
call.handler.SpawnInfallible("start-call", [this, handler = call.handler]() {
transport_pair_.client->client_transport()->StartCall(handler);
return Empty{};
});
return std::move(call.initiator);
}
CallHandler TransportTest::TickUntilServerCall() {
WatchDog watchdog(this);
for (;;) {
auto handler = acceptor_.PopHandler();
if (handler.has_value()) return std::move(*handler);
event_engine_->Tick();
}
}
void TransportTest::WaitForAllPendingWork() {
WatchDog watchdog(this);
while (!pending_actions_.empty()) {
if (pending_actions_.front()->IsDone()) {
pending_actions_.pop();
continue;
}
event_engine_->Tick();
}
}
void TransportTest::Timeout() {
std::vector<std::string> lines;
lines.emplace_back("Timeout waiting for pending actions to complete");
while (!pending_actions_.empty()) {
auto action = std::move(pending_actions_.front());
pending_actions_.pop();
if (action->IsDone()) continue;
absl::string_view state_name =
transport_test_detail::ActionState::StateString(action->Get());
absl::string_view file_name = action->file();
auto pos = file_name.find_last_of('/');
if (pos != absl::string_view::npos) {
file_name = file_name.substr(pos + 1);
}
lines.emplace_back(absl::StrCat(" ", state_name, " ", action->name(), " [",
action->step(), "]: ", file_name, ":",
action->line()));
}
Crash(absl::StrJoin(lines, "\n"));
}
std::string TransportTest::RandomString(int min_length, int max_length,
absl::string_view character_set) {
std::string out;
int length = absl::LogUniform<int>(rng_, min_length, max_length + 1);
for (int i = 0; i < length; ++i) {
out.push_back(
character_set[absl::Uniform<uint8_t>(rng_, 0, character_set.size())]);
}
return out;
}
std::string TransportTest::RandomStringFrom(
std::initializer_list<absl::string_view> choices) {
size_t idx = absl::Uniform<size_t>(rng_, 0, choices.size());
auto it = choices.begin();
for (size_t i = 0; i < idx; ++i) ++it;
return std::string(*it);
}
std::string TransportTest::RandomMetadataKey() {
if (absl::Bernoulli(rng_, 0.1)) {
return RandomStringFrom({
":path",
":method",
":status",
":authority",
":scheme",
});
}
std::string out;
do {
out = RandomString(1, 128, "abcdefghijklmnopqrstuvwxyz-_");
} while (absl::EndsWith(out, "-bin"));
return out;
}
std::string TransportTest::RandomMetadataValue(absl::string_view key) {
if (key == ":method") {
return RandomStringFrom({"GET", "POST", "PUT"});
}
if (key == ":status") {
return absl::StrCat(absl::Uniform<int>(rng_, 100, 600));
}
if (key == ":scheme") {
return RandomStringFrom({"http", "https"});
}
if (key == "te") {
return "trailers";
}
static const NoDestruct<std::string> kChars{[]() {
std::string out;
for (char c = 32; c < 127; c++) out.push_back(c);
return out;
}()};
return RandomString(0, 128, *kChars);
}
std::string TransportTest::RandomMetadataBinaryKey() {
return RandomString(1, 128, "abcdefghijklmnopqrstuvwxyz-_") + "-bin";
}
std::string TransportTest::RandomMetadataBinaryValue() {
static const NoDestruct<std::string> kChars{[]() {
std::string out;
for (int c = 0; c < 256; c++) {
out.push_back(static_cast<char>(static_cast<uint8_t>(c)));
}
return out;
}()};
return RandomString(0, 4096, *kChars);
}
std::vector<std::pair<std::string, std::string>>
TransportTest::RandomMetadata() {
size_t size = 0;
const size_t max_size = absl::LogUniform<size_t>(rng_, 64, 8000);
std::vector<std::pair<std::string, std::string>> out;
for (;;) {
std::string key;
std::string value;
if (absl::Bernoulli(rng_, 0.1)) {
key = RandomMetadataBinaryKey();
value = RandomMetadataBinaryValue();
} else {
key = RandomMetadataKey();
value = RandomMetadataValue(key);
}
bool include = true;
for (size_t i = 0; i < out.size(); ++i) {
if (out[i].first == key) {
include = false;
break;
}
}
if (!include) continue;
size_t this_size = 32 + key.size() + value.size();
if (size + this_size > max_size) {
if (out.empty()) continue;
break;
}
size += this_size;
out.emplace_back(std::move(key), std::move(value));
}
return out;
}
std::string TransportTest::RandomMessage() {
static const NoDestruct<std::string> kChars{[]() {
std::string out;
for (int c = 0; c < 256; c++) {
out.push_back(static_cast<char>(static_cast<uint8_t>(c)));
}
return out;
}()};
return RandomString(0, 1024 * 1024, *kChars);
}
///////////////////////////////////////////////////////////////////////////////
// TransportTest::Acceptor
Arena* TransportTest::Acceptor::CreateArena() {
return Arena::Create(1024, allocator_);
}
absl::StatusOr<CallInitiator> TransportTest::Acceptor::CreateCall(
ClientMetadata&, Arena* arena) {
auto call = MakeCall(event_engine_, arena);
handlers_.push(std::move(call.handler));
return std::move(call.initiator);
}
absl::optional<CallHandler> TransportTest::Acceptor::PopHandler() {
if (!handlers_.empty()) {
auto handler = std::move(handlers_.front());
handlers_.pop();
return handler;
}
return absl::nullopt;
}
///////////////////////////////////////////////////////////////////////////////
// ActionState
namespace transport_test_detail {
ActionState::ActionState(NameAndLocation name_and_location)
: name_and_location_(name_and_location), state_(kNotCreated) {}
bool ActionState::IsDone() {
switch (state_) {
case kNotCreated:
case kNotStarted:
case kStarted:
return false;
case kDone:
case kCancelled:
return true;
}
}
} // namespace transport_test_detail
} // namespace grpc_core

@ -0,0 +1,364 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_TEST_H
#define GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_TEST_H
#include <initializer_list>
#include <memory>
#include <queue>
#include "absl/functional/any_invocable.h"
#include "absl/random/bit_gen_ref.h"
#include "absl/strings/string_view.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/promise/cancel_callback.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/transport/test_suite/fixture.h"
namespace grpc_core {
namespace transport_test_detail {
struct NameAndLocation {
// NOLINTNEXTLINE
NameAndLocation(const char* name, SourceLocation location = {})
: location_(location), name_(name) {}
NameAndLocation Next() const {
return NameAndLocation(name_, location_, step_ + 1);
}
SourceLocation location() const { return location_; }
absl::string_view name() const { return name_; }
int step() const { return step_; }
private:
NameAndLocation(absl::string_view name, SourceLocation location, int step)
: location_(location), name_(name), step_(step) {}
SourceLocation location_;
absl::string_view name_;
int step_ = 1;
};
class ActionState {
public:
enum State : uint8_t {
kNotCreated,
kNotStarted,
kStarted,
kDone,
kCancelled,
};
static absl::string_view StateString(State state) {
switch (state) {
case kNotCreated:
return "🚦";
case kNotStarted:
return "";
case kStarted:
return "🚗";
case kDone:
return "🏁";
case kCancelled:
return "💥";
}
}
explicit ActionState(NameAndLocation name_and_location);
State Get() const { return state_; }
void Set(State state) {
gpr_log(GPR_INFO, "%s",
absl::StrCat(StateString(state), " ", name(), " [", step(), "] ",
file(), ":", line())
.c_str());
state_ = state;
}
const NameAndLocation& name_and_location() const {
return name_and_location_;
}
SourceLocation location() const { return name_and_location().location(); }
const char* file() const { return location().file(); }
int line() const { return location().line(); }
absl::string_view name() const { return name_and_location().name(); }
int step() const { return name_and_location().step(); }
bool IsDone();
private:
const NameAndLocation name_and_location_;
std::atomic<State> state_;
};
using PromiseSpawner = std::function<void(absl::string_view, Promise<Empty>)>;
using ActionStateFactory =
absl::FunctionRef<std::shared_ptr<ActionState>(NameAndLocation)>;
template <typename Context>
PromiseSpawner SpawnerForContext(
Context context,
grpc_event_engine::experimental::EventEngine* event_engine) {
return [context = std::move(context), event_engine](
absl::string_view name, Promise<Empty> promise) mutable {
// Pass new promises via event engine to allow fuzzers to explore
// reorderings of possibly interleaved spawns.
event_engine->Run([name, context = std::move(context),
promise = std::move(promise)]() mutable {
context.SpawnInfallible(name, std::move(promise));
});
};
}
template <typename Arg>
using NextSpawner = absl::AnyInvocable<void(Arg)>;
template <typename R>
Promise<Empty> WrapPromiseAndNext(std::shared_ptr<ActionState> action_state,
Promise<R> promise, NextSpawner<R> next) {
return Promise<Empty>(OnCancel(
[action_state, promise = std::move(promise),
next = std::move(next)]() mutable -> Poll<Empty> {
action_state->Set(ActionState::kStarted);
auto r = promise();
if (auto* p = r.value_if_ready()) {
action_state->Set(ActionState::kDone);
next(std::move(*p));
return Empty{};
} else {
return Pending{};
}
},
[action_state]() { action_state->Set(ActionState::kCancelled); }));
}
template <typename Arg>
NextSpawner<Arg> WrapFollowUps(NameAndLocation, ActionStateFactory,
PromiseSpawner) {
return [](Empty) {};
}
template <typename Arg, typename FirstFollowUp, typename... FollowUps>
NextSpawner<Arg> WrapFollowUps(NameAndLocation loc,
ActionStateFactory action_state_factory,
PromiseSpawner spawner, FirstFollowUp first,
FollowUps... follow_ups) {
using Factory = promise_detail::OncePromiseFactory<Arg, FirstFollowUp>;
using FactoryPromise = typename Factory::Promise;
using Result = typename FactoryPromise::Result;
auto action_state = action_state_factory(loc);
return [spawner, factory = Factory(std::move(first)),
next = WrapFollowUps<Result>(loc.Next(), action_state_factory,
spawner, std::move(follow_ups)...),
action_state = std::move(action_state),
name = loc.name()](Arg arg) mutable {
action_state->Set(ActionState::kNotStarted);
spawner(name,
WrapPromiseAndNext(std::move(action_state),
Promise<Result>(factory.Make(std::move(arg))),
std::move(next)));
};
}
template <typename First, typename... FollowUps>
void StartSeq(NameAndLocation loc, ActionStateFactory action_state_factory,
PromiseSpawner spawner, First first, FollowUps... followups) {
using Factory = promise_detail::OncePromiseFactory<void, First>;
using FactoryPromise = typename Factory::Promise;
using Result = typename FactoryPromise::Result;
auto action_state = action_state_factory(loc);
auto next = WrapFollowUps<Result>(loc.Next(), action_state_factory, spawner,
std::move(followups)...);
spawner(
loc.name(),
[spawner, first = Factory(std::move(first)), next = std::move(next),
action_state = std::move(action_state), name = loc.name()]() mutable {
action_state->Set(ActionState::kNotStarted);
spawner(name, WrapPromiseAndNext(std::move(action_state),
Promise<Result>(first.Make()),
std::move(next)));
return Empty{};
});
}
}; // namespace transport_test_detail
class TransportTest : public ::testing::Test {
public:
void RunTest();
protected:
TransportTest(std::unique_ptr<TransportFixture> fixture,
const fuzzing_event_engine::Actions& actions,
absl::BitGenRef rng)
: event_engine_(std::make_shared<
grpc_event_engine::experimental::FuzzingEventEngine>(
[]() {
grpc_timer_manager_set_threading(false);
grpc_event_engine::experimental::FuzzingEventEngine::Options
options;
return options;
}(),
actions)),
fixture_(std::move(fixture)),
rng_(rng) {}
void SetServerAcceptor();
CallInitiator CreateCall();
std::string RandomString(int min_length, int max_length,
absl::string_view character_set);
std::string RandomStringFrom(
std::initializer_list<absl::string_view> choices);
std::string RandomMetadataKey();
std::string RandomMetadataValue(absl::string_view key);
std::string RandomMetadataBinaryKey();
std::string RandomMetadataBinaryValue();
std::vector<std::pair<std::string, std::string>> RandomMetadata();
std::string RandomMessage();
absl::BitGenRef rng() { return rng_; }
CallHandler TickUntilServerCall();
void WaitForAllPendingWork();
// Alternative for Seq for test driver code.
// Registers each step so that WaitForAllPendingWork() can report progress,
// and wait for completion... AND generate good failure messages when a
// sequence doesn't complete in a timely manner.
template <typename Context, typename... Actions>
void SpawnTestSeq(Context context,
transport_test_detail::NameAndLocation name_and_location,
Actions... actions) {
transport_test_detail::StartSeq(
name_and_location,
[this](transport_test_detail::NameAndLocation name_and_location) {
auto action = std::make_shared<transport_test_detail::ActionState>(
name_and_location);
pending_actions_.push(action);
return action;
},
transport_test_detail::SpawnerForContext(std::move(context),
event_engine_.get()),
std::move(actions)...);
}
private:
virtual void TestImpl() = 0;
void Timeout();
class Acceptor final : public ServerTransport::Acceptor {
public:
Acceptor(grpc_event_engine::experimental::EventEngine* event_engine,
MemoryAllocator* allocator)
: event_engine_(event_engine), allocator_(allocator) {}
Arena* CreateArena() override;
absl::StatusOr<CallInitiator> CreateCall(
ClientMetadata& client_initial_metadata, Arena* arena) override;
absl::optional<CallHandler> PopHandler();
private:
std::queue<CallHandler> handlers_;
grpc_event_engine::experimental::EventEngine* const event_engine_;
MemoryAllocator* const allocator_;
};
class WatchDog {
public:
explicit WatchDog(TransportTest* test) : test_(test) {}
~WatchDog() { test_->event_engine_->Cancel(timer_); }
private:
TransportTest* const test_;
grpc_event_engine::experimental::EventEngine::TaskHandle const timer_{
test_->event_engine_->RunAfter(Duration::Minutes(5),
[this]() { test_->Timeout(); })};
};
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_{
std::make_shared<grpc_event_engine::experimental::FuzzingEventEngine>(
[]() {
grpc_timer_manager_set_threading(false);
grpc_event_engine::experimental::FuzzingEventEngine::Options
options;
return options;
}(),
fuzzing_event_engine::Actions())};
std::unique_ptr<TransportFixture> fixture_;
MemoryAllocator allocator_ = MakeResourceQuota("test-quota")
->memory_quota()
->CreateMemoryAllocator("test-allocator");
Acceptor acceptor_{event_engine_.get(), &allocator_};
TransportFixture::ClientAndServerTransportPair transport_pair_ =
fixture_->CreateTransportPair(event_engine_);
std::queue<std::shared_ptr<transport_test_detail::ActionState>>
pending_actions_;
absl::BitGenRef rng_;
};
class TransportTestRegistry {
public:
static TransportTestRegistry& Get();
void RegisterTest(
absl::string_view name,
absl::AnyInvocable<TransportTest*(std::unique_ptr<TransportFixture>,
const fuzzing_event_engine::Actions&,
absl::BitGenRef) const>
create);
struct Test {
absl::string_view name;
absl::AnyInvocable<TransportTest*(std::unique_ptr<TransportFixture>,
const fuzzing_event_engine::Actions&,
absl::BitGenRef) const>
create;
};
const std::vector<Test>& tests() const { return tests_; }
private:
std::vector<Test> tests_;
};
} // namespace grpc_core
#define TRANSPORT_TEST(name) \
class TransportTest_##name : public grpc_core::TransportTest { \
public: \
using TransportTest::TransportTest; \
void TestBody() override { RunTest(); } \
\
private: \
void TestImpl() override; \
static grpc_core::TransportTest* Create( \
std::unique_ptr<grpc_core::TransportFixture> fixture, \
const fuzzing_event_engine::Actions& actions, absl::BitGenRef rng) { \
return new TransportTest_##name(std::move(fixture), actions, rng); \
} \
static int registered_; \
}; \
int TransportTest_##name::registered_ = \
(grpc_core::TransportTestRegistry::Get().RegisterTest(#name, &Create), \
0); \
void TransportTest_##name::TestImpl()
#endif // GRPC_TEST_CORE_TRANSPORT_TEST_SUITE_TEST_H

@ -0,0 +1,42 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/random/random.h"
#include "src/core/lib/debug/trace.h"
#include "test/core/transport/test_suite/fixture.h"
#include "test/core/transport/test_suite/test.h"
#include "test/core/util/test_config.h"
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
absl::BitGen bitgen;
::testing::InitGoogleTest(&argc, argv);
for (const auto& test : grpc_core::TransportTestRegistry::Get().tests()) {
for (const auto& fixture :
grpc_core::TransportFixtureRegistry::Get().fixtures()) {
::testing::RegisterTest(
"TransportTest", absl::StrCat(test.name, "/", fixture.name).c_str(),
nullptr, nullptr, __FILE__, __LINE__,
[test = &test, fixture = &fixture,
&bitgen]() -> grpc_core::TransportTest* {
return test->create(
std::unique_ptr<grpc_core::TransportFixture>(fixture->create()),
fuzzing_event_engine::Actions(), bitgen);
});
}
}
grpc_tracer_init();
return RUN_ALL_TESTS();
}

@ -403,6 +403,7 @@ for dirname in [
"test/core/promise",
"test/core/resource_quota",
"test/core/transport/chaotic_good",
"test/core/transport/test_suite",
"fuzztest",
"fuzztest/core/channel",
"fuzztest/core/transport/chttp2",
@ -423,6 +424,7 @@ for dirname in [
"grpc_cc_library": grpc_cc_library,
"grpc_cc_test": grpc_cc_library,
"grpc_core_end2end_test": lambda **kwargs: None,
"grpc_transport_test": lambda **kwargs: None,
"grpc_fuzzer": grpc_cc_library,
"grpc_fuzz_test": grpc_cc_library,
"grpc_proto_fuzzer": grpc_cc_library,
@ -655,14 +657,28 @@ def make_library(library):
return (library, error, deps, external_deps)
def matches_target(library, target):
if not target.startswith("//"):
if "/" in target:
target = "//" + target
else:
target = "//:" + target
if target == "..." or target == "//...":
return True
if target.endswith("/..."):
return library.startswith(target[:-4])
return library == target
def main() -> None:
update_libraries = []
for library in sorted(consumes.keys()):
if library in no_update:
continue
if args.targets and library not in args.targets:
continue
update_libraries.append(library)
for target in args.targets:
if matches_target(library, target):
update_libraries.append(library)
break
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as p:
updated_libraries = p.map(make_library, update_libraries, 1)

@ -1945,6 +1945,26 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "chaotic_good_test",
"platforms": [
"linux",
"posix"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
@ -5035,6 +5055,26 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "inproc_test",
"platforms": [
"linux",
"posix"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save