Revert "Revert "[resource quota] Fix bugs in iomgr and event engine endpoint interactions with resource quota"" (#33499)

Reverts grpc/grpc#33417

Deadlock
https://fusion2.corp.google.com/invocations/99834386-79ff-4707-86eb-52e604774ea9/details
fixed in the c9a1bdc3dc commit.
pull/33508/head
Vignesh Babu 1 year ago committed by GitHub
parent 2c336ac7fd
commit cd4ff81b3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 54
      CMakeLists.txt
  2. 13
      build_autogenerated.yaml
  3. 2
      src/core/BUILD
  4. 36
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  5. 4
      src/core/lib/event_engine/posix_engine/posix_endpoint.h
  6. 58
      src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
  7. 11
      src/core/lib/iomgr/exec_ctx.h
  8. 25
      src/core/lib/iomgr/tcp_posix.cc
  9. 18
      test/cpp/end2end/BUILD
  10. 154
      test/cpp/end2end/resource_quota_end2end_stress_test.cc
  11. 24
      tools/run_tests/generated/tests.json

54
CMakeLists.txt generated

@ -1187,6 +1187,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx resolve_address_using_native_resolver_posix_test)
endif()
add_dependencies(buildtests_cxx resolve_address_using_native_resolver_test)
add_dependencies(buildtests_cxx resource_quota_end2end_stress_test)
add_dependencies(buildtests_cxx resource_quota_server_test)
add_dependencies(buildtests_cxx resource_quota_test)
add_dependencies(buildtests_cxx retry_cancel_after_first_attempt_starts_test)
@ -19886,6 +19887,59 @@ target_link_libraries(resolve_address_using_native_resolver_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(resource_quota_end2end_stress_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h
test/cpp/end2end/resource_quota_end2end_stress_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_compile_features(resource_quota_end2end_stress_test PUBLIC cxx_std_14)
target_include_directories(resource_quota_end2end_stress_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(resource_quota_end2end_stress_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_test_util
)
endif()
if(gRPC_BUILD_TESTS)

@ -12074,6 +12074,19 @@ targets:
deps:
- grpc_test_util
- grpc++_test_config
- name: resource_quota_end2end_stress_test
gtest: true
build: test
language: c++
headers: []
src:
- src/proto/grpc/testing/echo.proto
- src/proto/grpc/testing/echo_messages.proto
- src/proto/grpc/testing/simple_messages.proto
- src/proto/grpc/testing/xds/v3/orca_load_report.proto
- test/cpp/end2end/resource_quota_end2end_stress_test.cc
deps:
- grpc++_test_util
- name: resource_quota_server_test
gtest: true
build: test

@ -1841,6 +1841,7 @@ grpc_cc_library(
"time",
"//:debug_location",
"//:event_engine_base_hdrs",
"//:exec_ctx",
"//:gpr",
"//:grpc_public_hdrs",
"//:ref_counted_ptr",
@ -1942,6 +1943,7 @@ grpc_cc_library(
"socket_mutator",
"status_helper",
"//:event_engine_base_hdrs",
"//:exec_ctx",
"//:gpr",
],
)

@ -49,7 +49,9 @@
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
@ -549,28 +551,40 @@ void PosixEndpointImpl::MaybeMakeReadSlices() {
}
}
void PosixEndpointImpl::HandleRead(absl::Status status) {
read_mu_.Lock();
bool PosixEndpointImpl::HandleReadLocked(absl::Status& status) {
if (status.ok() && memory_owner_.is_valid()) {
MaybeMakeReadSlices();
if (!TcpDoRead(status)) {
UpdateRcvLowat();
// We've consumed the edge, request a new one.
read_mu_.Unlock();
handle_->NotifyOnRead(on_read_);
return;
return false;
}
} else {
if (!memory_owner_.is_valid()) {
status = absl::UnknownError("Shutting down endpoint");
if (!memory_owner_.is_valid() && status.ok()) {
status = TcpAnnotateError(absl::UnknownError("Shutting down endpoint"));
}
incoming_buffer_->Clear();
last_read_buffer_.Clear();
}
absl::AnyInvocable<void(absl::Status)> cb = std::move(read_cb_);
read_cb_ = nullptr;
incoming_buffer_ = nullptr;
read_mu_.Unlock();
return true;
}
void PosixEndpointImpl::HandleRead(absl::Status status) {
bool ret = false;
absl::AnyInvocable<void(absl::Status)> cb = nullptr;
grpc_core::EnsureRunInExecCtx([&, this]() mutable {
grpc_core::MutexLock lock(&read_mu_);
ret = HandleReadLocked(status);
if (ret) {
cb = std::move(read_cb_);
read_cb_ = nullptr;
incoming_buffer_ = nullptr;
}
});
if (!ret) {
handle_->NotifyOnRead(on_read_);
return;
}
cb(status);
Unref();
}

@ -503,7 +503,9 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
void UpdateRcvLowat() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
void HandleWrite(absl::Status status);
void HandleError(absl::Status status);
void HandleRead(absl::Status status);
void HandleRead(absl::Status status) ABSL_NO_THREAD_SAFETY_ANALYSIS;
bool HandleReadLocked(absl::Status& status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
void MaybeMakeReadSlices() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
bool TcpDoRead(absl::Status& status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
void FinishEstimate();

@ -15,6 +15,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP
@ -24,6 +25,7 @@
#include <unistd.h> // IWYU pragma: keep
#include <string>
#include <type_traits>
#include <utility>
#include "absl/functional/any_invocable.h"
@ -197,15 +199,20 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
listener_->memory_allocator_factory_->CreateMemoryAllocator(
absl::StrCat("endpoint-tcp-server-connection: ", *peer_name)),
/*options=*/listener_->options_);
// Call on_accept_ and then resume accepting new connections by continuing
// the parent for-loop.
listener_->on_accept_(
/*listener_fd=*/handle_->WrappedFd(), /*endpoint=*/std::move(endpoint),
/*is_external=*/false,
/*memory_allocator=*/
listener_->memory_allocator_factory_->CreateMemoryAllocator(
absl::StrCat("on-accept-tcp-server-connection: ", *peer_name)),
/*pending_data=*/nullptr);
grpc_core::EnsureRunInExecCtx([this, peer_name = std::move(*peer_name),
endpoint = std::move(endpoint)]() mutable {
// Call on_accept_ and then resume accepting new connections
// by continuing the parent for-loop.
listener_->on_accept_(
/*listener_fd=*/handle_->WrappedFd(),
/*endpoint=*/std::move(endpoint),
/*is_external=*/false,
/*memory_allocator=*/
listener_->memory_allocator_factory_->CreateMemoryAllocator(
absl::StrCat("on-accept-tcp-server-connection: ", peer_name)),
/*pending_data=*/nullptr);
});
}
GPR_UNREACHABLE_CODE(return);
}
@ -228,21 +235,24 @@ absl::Status PosixEngineListenerImpl::HandleExternalConnection(
absl::StrCat("HandleExternalConnection: peer not connected: ",
peer_name.status().ToString()));
}
auto endpoint = CreatePosixEndpoint(
/*handle=*/poller_->CreateHandle(fd, *peer_name,
poller_->CanTrackErrors()),
/*on_shutdown=*/nullptr, /*engine=*/engine_,
/*allocator=*/
memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat(
"external:endpoint-tcp-server-connection: ", *peer_name)),
/*options=*/options_);
on_accept_(
/*listener_fd=*/listener_fd, /*endpoint=*/std::move(endpoint),
/*is_external=*/true,
/*memory_allocator=*/
memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat(
"external:on-accept-tcp-server-connection: ", *peer_name)),
/*pending_data=*/pending_data);
grpc_core::EnsureRunInExecCtx([this, peer_name = std::move(*peer_name),
pending_data, listener_fd, fd]() mutable {
auto endpoint = CreatePosixEndpoint(
/*handle=*/poller_->CreateHandle(fd, peer_name,
poller_->CanTrackErrors()),
/*on_shutdown=*/nullptr, /*engine=*/engine_,
/*allocator=*/
memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat(
"external:endpoint-tcp-server-connection: ", peer_name)),
/*options=*/options_);
on_accept_(
/*listener_fd=*/listener_fd, /*endpoint=*/std::move(endpoint),
/*is_external=*/true,
/*memory_allocator=*/
memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat(
"external:on-accept-tcp-server-connection: ", peer_name)),
/*pending_data=*/pending_data);
});
return absl::OkStatus();
}

@ -329,6 +329,17 @@ class ApplicationCallbackExecCtx {
static thread_local ApplicationCallbackExecCtx* callback_exec_ctx_;
};
template <typename F>
void EnsureRunInExecCtx(F f) {
if (ExecCtx::Get() == nullptr) {
ApplicationCallbackExecCtx app_ctx;
ExecCtx exec_ctx;
f();
} else {
f();
}
}
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H

@ -729,6 +729,9 @@ static void tcp_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
ZerocopyDisableAndWaitForRemaining(tcp);
grpc_fd_shutdown(tcp->em_fd, why);
tcp->read_mu.Lock();
tcp->memory_owner.Reset();
tcp->read_mu.Unlock();
}
static void tcp_free(grpc_tcp* tcp) {
@ -775,6 +778,9 @@ static void tcp_destroy(grpc_endpoint* ep) {
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
grpc_fd_set_error(tcp->em_fd);
}
tcp->read_mu.Lock();
tcp->memory_owner.Reset();
tcp->read_mu.Unlock();
TCP_UNREF(tcp, "destroy");
}
@ -795,11 +801,14 @@ static void maybe_post_reclaimer(grpc_tcp* tcp)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {
if (!tcp->has_posted_reclaimer) {
tcp->has_posted_reclaimer = true;
TCP_REF(tcp, "posted_reclaimer");
tcp->memory_owner.PostReclaimer(
grpc_core::ReclamationPass::kBenign,
[tcp](absl::optional<grpc_core::ReclamationSweep> sweep) {
if (!sweep.has_value()) return;
perform_reclamation(tcp);
if (sweep.has_value()) {
perform_reclamation(tcp);
}
TCP_UNREF(tcp, "posted_reclaimer");
});
}
}
@ -1088,7 +1097,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
}
tcp->read_mu.Lock();
grpc_error_handle tcp_read_error;
if (GPR_LIKELY(error.ok())) {
if (GPR_LIKELY(error.ok()) && tcp->memory_owner.is_valid()) {
maybe_make_read_slices(tcp);
if (!tcp_do_read(tcp, &tcp_read_error)) {
// Maybe update rcv lowat value based on the number of bytes read in this
@ -1101,7 +1110,12 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
}
tcp_trace_read(tcp, tcp_read_error);
} else {
tcp_read_error = error;
if (!tcp->memory_owner.is_valid() && error.ok()) {
tcp_read_error =
tcp_annotate_error(absl::InternalError("Socket closed"), tcp);
} else {
tcp_read_error = error;
}
grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
}
@ -2031,6 +2045,9 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
grpc_fd_set_error(tcp->em_fd);
}
tcp->read_mu.Lock();
tcp->memory_owner.Reset();
tcp->read_mu.Unlock();
TCP_UNREF(tcp, "destroy");
}

@ -1008,3 +1008,21 @@ grpc_cc_test(
"//test/cpp/util:test_util",
],
)
grpc_cc_test(
name = "resource_quota_end2end_stress_test",
srcs = ["resource_quota_end2end_stress_test.cc"],
external_deps = [
"gtest",
"absl/strings",
"absl/time",
],
deps = [
"//:grpc++",
"//src/core:experiments",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
],
)

@ -0,0 +1,154 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <gtest/gtest.h>
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/time/time.h"
#include <grpc/support/time.h>
#include <grpcpp/client_context.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/support/server_callback.h>
#include "src/core/lib/experiments/config.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
// A stress test which spins up a server with a small configured resource quota
// value. It then creates many channels which exchange large payloads with the
// server. This would drive the server to reach resource quota limits and
// trigger reclamation.
namespace grpc {
namespace testing {
namespace {
constexpr int kResourceQuotaSizeBytes = 1024 * 1024;
constexpr int kPayloadSizeBytes = 1024 * 1024;
constexpr int kNumParallelChannels = 1024;
} // namespace
class EchoClientUnaryReactor : public grpc::ClientUnaryReactor {
public:
EchoClientUnaryReactor(ClientContext* ctx, EchoTestService::Stub* stub,
const std::string payload, Status* status)
: ctx_(ctx), payload_(payload), status_(status) {
ctx_->set_wait_for_ready(true);
request_.set_message(payload);
stub->async()->Echo(ctx_, &request_, &response_, this);
StartCall();
}
void Await() { notification_.WaitForNotification(); }
protected:
void OnReadInitialMetadataDone(bool /*ok*/) override {}
void OnDone(const Status& s) override {
*status_ = s;
notification_.Notify();
}
private:
ClientContext* const ctx_;
EchoRequest request_;
EchoResponse response_;
const std::string payload_;
grpc_core::Notification notification_;
Status* const status_;
};
class EchoServerUnaryReactor : public ServerUnaryReactor {
public:
EchoServerUnaryReactor(CallbackServerContext* /*ctx*/,
const EchoRequest* request, EchoResponse* response) {
response->set_message(request->message());
Finish(grpc::Status::OK);
}
private:
void OnDone() override { delete this; }
};
class GrpcCallbackServiceImpl : public EchoTestService::CallbackService {
public:
ServerUnaryReactor* Echo(CallbackServerContext* context,
const EchoRequest* request,
EchoResponse* response) override {
return new EchoServerUnaryReactor(context, request, response);
}
};
class End2EndResourceQuotaUnaryTest : public ::testing::Test {
protected:
End2EndResourceQuotaUnaryTest() {
int port = grpc_pick_unused_port_or_die();
server_address_ = absl::StrCat("localhost:", port);
payload_ = std::string(kPayloadSizeBytes, 'a');
ServerBuilder builder;
builder.AddListeningPort(server_address_, InsecureServerCredentials());
builder.SetResourceQuota(
grpc::ResourceQuota("TestService").Resize(kResourceQuotaSizeBytes));
builder.RegisterService(&grpc_service_);
server_ = builder.BuildAndStart();
}
~End2EndResourceQuotaUnaryTest() override { server_->Shutdown(); }
void MakeGrpcCall() {
ClientContext ctx;
Status status;
auto stub = EchoTestService::NewStub(
CreateChannel(server_address_, grpc::InsecureChannelCredentials()));
EchoClientUnaryReactor reactor(&ctx, stub.get(), payload_, &status);
reactor.Await();
}
void MakeGrpcCalls() {
std::vector<std::thread> workers;
workers.reserve(kNumParallelChannels);
// Run MakeGrpcCall() many times concurrently.
for (int i = 0; i < kNumParallelChannels; ++i) {
workers.emplace_back([this]() { MakeGrpcCall(); });
}
for (int i = 0; i < kNumParallelChannels; ++i) {
workers[i].join();
}
}
int port_;
std::unique_ptr<Server> server_;
string server_address_;
GrpcCallbackServiceImpl grpc_service_;
std::string payload_;
};
TEST_F(End2EndResourceQuotaUnaryTest, MultipleUnaryRPCTest) { MakeGrpcCalls(); }
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -7113,6 +7113,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "resource_quota_end2end_stress_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save