diff --git a/CMakeLists.txt b/CMakeLists.txt index 75af5a91eb5..437a132db70 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1187,6 +1187,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx resolve_address_using_native_resolver_posix_test) endif() add_dependencies(buildtests_cxx resolve_address_using_native_resolver_test) + add_dependencies(buildtests_cxx resource_quota_end2end_stress_test) add_dependencies(buildtests_cxx resource_quota_server_test) add_dependencies(buildtests_cxx resource_quota_test) add_dependencies(buildtests_cxx retry_cancel_after_first_attempt_starts_test) @@ -19886,6 +19887,59 @@ target_link_libraries(resolve_address_using_native_resolver_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(resource_quota_end2end_stress_test + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h + test/cpp/end2end/resource_quota_end2end_stress_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) +target_compile_features(resource_quota_end2end_stress_test PUBLIC cxx_std_14) +target_include_directories(resource_quota_end2end_stress_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(resource_quota_end2end_stress_test + ${_gRPC_BASELIB_LIBRARIES} + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ZLIB_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc++_test_util +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index dee5255bf99..469e807e374 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -12074,6 +12074,19 @@ targets: deps: - grpc_test_util - grpc++_test_config +- name: resource_quota_end2end_stress_test + gtest: true + build: test + language: c++ + headers: [] + src: + - src/proto/grpc/testing/echo.proto + - src/proto/grpc/testing/echo_messages.proto + - src/proto/grpc/testing/simple_messages.proto + - src/proto/grpc/testing/xds/v3/orca_load_report.proto + - test/cpp/end2end/resource_quota_end2end_stress_test.cc + deps: + - grpc++_test_util - name: resource_quota_server_test gtest: true build: test diff --git a/src/core/BUILD b/src/core/BUILD index 56bc93eba7f..76978c9248a 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1841,6 +1841,7 @@ grpc_cc_library( "time", "//:debug_location", "//:event_engine_base_hdrs", + "//:exec_ctx", "//:gpr", "//:grpc_public_hdrs", "//:ref_counted_ptr", @@ -1942,6 +1943,7 @@ grpc_cc_library( "socket_mutator", "status_helper", "//:event_engine_base_hdrs", + "//:exec_ctx", "//:gpr", ], ) diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc index b99dde4bc4b..6d90c99c79d 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc @@ -49,7 +49,9 @@ #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/strerror.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/slice/slice.h" @@ -549,28 +551,40 @@ void PosixEndpointImpl::MaybeMakeReadSlices() { } } -void PosixEndpointImpl::HandleRead(absl::Status status) { - read_mu_.Lock(); +bool PosixEndpointImpl::HandleReadLocked(absl::Status& status) { if (status.ok() && memory_owner_.is_valid()) { MaybeMakeReadSlices(); if (!TcpDoRead(status)) { UpdateRcvLowat(); // We've consumed the edge, request a new one. - read_mu_.Unlock(); - handle_->NotifyOnRead(on_read_); - return; + return false; } } else { - if (!memory_owner_.is_valid()) { - status = absl::UnknownError("Shutting down endpoint"); + if (!memory_owner_.is_valid() && status.ok()) { + status = TcpAnnotateError(absl::UnknownError("Shutting down endpoint")); } incoming_buffer_->Clear(); last_read_buffer_.Clear(); } - absl::AnyInvocable cb = std::move(read_cb_); - read_cb_ = nullptr; - incoming_buffer_ = nullptr; - read_mu_.Unlock(); + return true; +} + +void PosixEndpointImpl::HandleRead(absl::Status status) { + bool ret = false; + absl::AnyInvocable cb = nullptr; + grpc_core::EnsureRunInExecCtx([&, this]() mutable { + grpc_core::MutexLock lock(&read_mu_); + ret = HandleReadLocked(status); + if (ret) { + cb = std::move(read_cb_); + read_cb_ = nullptr; + incoming_buffer_ = nullptr; + } + }); + if (!ret) { + handle_->NotifyOnRead(on_read_); + return; + } cb(status); Unref(); } diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.h b/src/core/lib/event_engine/posix_engine/posix_endpoint.h index 8c2fae5ecf5..3e8f6162601 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.h +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.h @@ -503,7 +503,9 @@ class PosixEndpointImpl : public grpc_core::RefCounted { void UpdateRcvLowat() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); void HandleWrite(absl::Status status); void HandleError(absl::Status status); - void HandleRead(absl::Status status); + void HandleRead(absl::Status status) ABSL_NO_THREAD_SAFETY_ANALYSIS; + bool HandleReadLocked(absl::Status& status) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); void MaybeMakeReadSlices() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); bool TcpDoRead(absl::Status& status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); void FinishEstimate(); diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc index 98f7a6783e4..59facad83b5 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc @@ -15,6 +15,7 @@ #include #include "src/core/lib/event_engine/posix.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP @@ -24,6 +25,7 @@ #include // IWYU pragma: keep #include +#include #include #include "absl/functional/any_invocable.h" @@ -197,15 +199,20 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept( listener_->memory_allocator_factory_->CreateMemoryAllocator( absl::StrCat("endpoint-tcp-server-connection: ", *peer_name)), /*options=*/listener_->options_); - // Call on_accept_ and then resume accepting new connections by continuing - // the parent for-loop. - listener_->on_accept_( - /*listener_fd=*/handle_->WrappedFd(), /*endpoint=*/std::move(endpoint), - /*is_external=*/false, - /*memory_allocator=*/ - listener_->memory_allocator_factory_->CreateMemoryAllocator( - absl::StrCat("on-accept-tcp-server-connection: ", *peer_name)), - /*pending_data=*/nullptr); + + grpc_core::EnsureRunInExecCtx([this, peer_name = std::move(*peer_name), + endpoint = std::move(endpoint)]() mutable { + // Call on_accept_ and then resume accepting new connections + // by continuing the parent for-loop. + listener_->on_accept_( + /*listener_fd=*/handle_->WrappedFd(), + /*endpoint=*/std::move(endpoint), + /*is_external=*/false, + /*memory_allocator=*/ + listener_->memory_allocator_factory_->CreateMemoryAllocator( + absl::StrCat("on-accept-tcp-server-connection: ", peer_name)), + /*pending_data=*/nullptr); + }); } GPR_UNREACHABLE_CODE(return); } @@ -228,21 +235,24 @@ absl::Status PosixEngineListenerImpl::HandleExternalConnection( absl::StrCat("HandleExternalConnection: peer not connected: ", peer_name.status().ToString())); } - auto endpoint = CreatePosixEndpoint( - /*handle=*/poller_->CreateHandle(fd, *peer_name, - poller_->CanTrackErrors()), - /*on_shutdown=*/nullptr, /*engine=*/engine_, - /*allocator=*/ - memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat( - "external:endpoint-tcp-server-connection: ", *peer_name)), - /*options=*/options_); - on_accept_( - /*listener_fd=*/listener_fd, /*endpoint=*/std::move(endpoint), - /*is_external=*/true, - /*memory_allocator=*/ - memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat( - "external:on-accept-tcp-server-connection: ", *peer_name)), - /*pending_data=*/pending_data); + grpc_core::EnsureRunInExecCtx([this, peer_name = std::move(*peer_name), + pending_data, listener_fd, fd]() mutable { + auto endpoint = CreatePosixEndpoint( + /*handle=*/poller_->CreateHandle(fd, peer_name, + poller_->CanTrackErrors()), + /*on_shutdown=*/nullptr, /*engine=*/engine_, + /*allocator=*/ + memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat( + "external:endpoint-tcp-server-connection: ", peer_name)), + /*options=*/options_); + on_accept_( + /*listener_fd=*/listener_fd, /*endpoint=*/std::move(endpoint), + /*is_external=*/true, + /*memory_allocator=*/ + memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat( + "external:on-accept-tcp-server-connection: ", peer_name)), + /*pending_data=*/pending_data); + }); return absl::OkStatus(); } diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 230af79699d..20d9ebe04e1 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -329,6 +329,17 @@ class ApplicationCallbackExecCtx { static thread_local ApplicationCallbackExecCtx* callback_exec_ctx_; }; +template +void EnsureRunInExecCtx(F f) { + if (ExecCtx::Get() == nullptr) { + ApplicationCallbackExecCtx app_ctx; + ExecCtx exec_ctx; + f(); + } else { + f(); + } +} + } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 3b732122528..4664d1a10d2 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -729,6 +729,9 @@ static void tcp_shutdown(grpc_endpoint* ep, grpc_error_handle why) { grpc_tcp* tcp = reinterpret_cast(ep); ZerocopyDisableAndWaitForRemaining(tcp); grpc_fd_shutdown(tcp->em_fd, why); + tcp->read_mu.Lock(); + tcp->memory_owner.Reset(); + tcp->read_mu.Unlock(); } static void tcp_free(grpc_tcp* tcp) { @@ -775,6 +778,9 @@ static void tcp_destroy(grpc_endpoint* ep) { gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } + tcp->read_mu.Lock(); + tcp->memory_owner.Reset(); + tcp->read_mu.Unlock(); TCP_UNREF(tcp, "destroy"); } @@ -795,11 +801,14 @@ static void maybe_post_reclaimer(grpc_tcp* tcp) ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { if (!tcp->has_posted_reclaimer) { tcp->has_posted_reclaimer = true; + TCP_REF(tcp, "posted_reclaimer"); tcp->memory_owner.PostReclaimer( grpc_core::ReclamationPass::kBenign, [tcp](absl::optional sweep) { - if (!sweep.has_value()) return; - perform_reclamation(tcp); + if (sweep.has_value()) { + perform_reclamation(tcp); + } + TCP_UNREF(tcp, "posted_reclaimer"); }); } } @@ -1088,7 +1097,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { } tcp->read_mu.Lock(); grpc_error_handle tcp_read_error; - if (GPR_LIKELY(error.ok())) { + if (GPR_LIKELY(error.ok()) && tcp->memory_owner.is_valid()) { maybe_make_read_slices(tcp); if (!tcp_do_read(tcp, &tcp_read_error)) { // Maybe update rcv lowat value based on the number of bytes read in this @@ -1101,7 +1110,12 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { } tcp_trace_read(tcp, tcp_read_error); } else { - tcp_read_error = error; + if (!tcp->memory_owner.is_valid() && error.ok()) { + tcp_read_error = + tcp_annotate_error(absl::InternalError("Socket closed"), tcp); + } else { + tcp_read_error = error; + } grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer); } @@ -2031,6 +2045,9 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } + tcp->read_mu.Lock(); + tcp->memory_owner.Reset(); + tcp->read_mu.Unlock(); TCP_UNREF(tcp, "destroy"); } diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index f29027c34ed..69b01b60d64 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -1008,3 +1008,21 @@ grpc_cc_test( "//test/cpp/util:test_util", ], ) + +grpc_cc_test( + name = "resource_quota_end2end_stress_test", + srcs = ["resource_quota_end2end_stress_test.cc"], + external_deps = [ + "gtest", + "absl/strings", + "absl/time", + ], + deps = [ + "//:grpc++", + "//src/core:experiments", + "//src/proto/grpc/testing:echo_messages_proto", + "//src/proto/grpc/testing:echo_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) diff --git a/test/cpp/end2end/resource_quota_end2end_stress_test.cc b/test/cpp/end2end/resource_quota_end2end_stress_test.cc new file mode 100644 index 00000000000..c475fa1ab68 --- /dev/null +++ b/test/cpp/end2end/resource_quota_end2end_stress_test.cc @@ -0,0 +1,154 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include + +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" +#include "absl/time/time.h" + +#include +#include +#include +#include + +#include "src/core/lib/experiments/config.h" +#include "src/core/lib/gprpp/notification.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +// A stress test which spins up a server with a small configured resource quota +// value. It then creates many channels which exchange large payloads with the +// server. This would drive the server to reach resource quota limits and +// trigger reclamation. + +namespace grpc { +namespace testing { +namespace { +constexpr int kResourceQuotaSizeBytes = 1024 * 1024; +constexpr int kPayloadSizeBytes = 1024 * 1024; +constexpr int kNumParallelChannels = 1024; +} // namespace + +class EchoClientUnaryReactor : public grpc::ClientUnaryReactor { + public: + EchoClientUnaryReactor(ClientContext* ctx, EchoTestService::Stub* stub, + const std::string payload, Status* status) + : ctx_(ctx), payload_(payload), status_(status) { + ctx_->set_wait_for_ready(true); + request_.set_message(payload); + stub->async()->Echo(ctx_, &request_, &response_, this); + StartCall(); + } + + void Await() { notification_.WaitForNotification(); } + + protected: + void OnReadInitialMetadataDone(bool /*ok*/) override {} + + void OnDone(const Status& s) override { + *status_ = s; + notification_.Notify(); + } + + private: + ClientContext* const ctx_; + EchoRequest request_; + EchoResponse response_; + const std::string payload_; + grpc_core::Notification notification_; + Status* const status_; +}; + +class EchoServerUnaryReactor : public ServerUnaryReactor { + public: + EchoServerUnaryReactor(CallbackServerContext* /*ctx*/, + const EchoRequest* request, EchoResponse* response) { + response->set_message(request->message()); + Finish(grpc::Status::OK); + } + + private: + void OnDone() override { delete this; } +}; + +class GrpcCallbackServiceImpl : public EchoTestService::CallbackService { + public: + ServerUnaryReactor* Echo(CallbackServerContext* context, + const EchoRequest* request, + EchoResponse* response) override { + return new EchoServerUnaryReactor(context, request, response); + } +}; + +class End2EndResourceQuotaUnaryTest : public ::testing::Test { + protected: + End2EndResourceQuotaUnaryTest() { + int port = grpc_pick_unused_port_or_die(); + server_address_ = absl::StrCat("localhost:", port); + payload_ = std::string(kPayloadSizeBytes, 'a'); + ServerBuilder builder; + builder.AddListeningPort(server_address_, InsecureServerCredentials()); + builder.SetResourceQuota( + grpc::ResourceQuota("TestService").Resize(kResourceQuotaSizeBytes)); + builder.RegisterService(&grpc_service_); + server_ = builder.BuildAndStart(); + } + + ~End2EndResourceQuotaUnaryTest() override { server_->Shutdown(); } + + void MakeGrpcCall() { + ClientContext ctx; + Status status; + auto stub = EchoTestService::NewStub( + CreateChannel(server_address_, grpc::InsecureChannelCredentials())); + EchoClientUnaryReactor reactor(&ctx, stub.get(), payload_, &status); + reactor.Await(); + } + + void MakeGrpcCalls() { + std::vector workers; + workers.reserve(kNumParallelChannels); + // Run MakeGrpcCall() many times concurrently. + for (int i = 0; i < kNumParallelChannels; ++i) { + workers.emplace_back([this]() { MakeGrpcCall(); }); + } + for (int i = 0; i < kNumParallelChannels; ++i) { + workers[i].join(); + } + } + + int port_; + std::unique_ptr server_; + string server_address_; + GrpcCallbackServiceImpl grpc_service_; + std::string payload_; +}; + +TEST_F(End2EndResourceQuotaUnaryTest, MultipleUnaryRPCTest) { MakeGrpcCalls(); } + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 08518e881ff..6c98574c118 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -7113,6 +7113,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "resource_quota_end2end_stress_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,