diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 7ac39d01cff..d55590e30e0 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -452,11 +452,13 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone( OrphanablePtr handshaking_state_ref; RefCountedPtr handshake_mgr; bool cleanup_connection = false; + bool release_connection = false; { MutexLock connection_lock(&self->connection_->mu_); if (!error.ok() || self->connection_->shutdown_) { std::string error_str = StatusToString(error); cleanup_connection = true; + release_connection = true; if (error.ok() && args->endpoint != nullptr) { // We were shut down or stopped serving after handshaking completed // successfully, so destroy the endpoint here. @@ -540,9 +542,11 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone( grpc_slice_buffer_destroy(args->read_buffer); gpr_free(args->read_buffer); cleanup_connection = true; + release_connection = true; } } else { cleanup_connection = true; + release_connection = true; } } // Since the handshake manager is done, the connection no longer needs to @@ -557,6 +561,9 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone( OrphanablePtr connection; if (cleanup_connection) { MutexLock listener_lock(&self->connection_->listener_->mu_); + if (release_connection) { + self->connection_->listener_->connection_quota_->ReleaseConnections(1); + } auto it = self->connection_->listener_->connections_.find( self->connection_.get()); if (it != self->connection_->listener_->connections_.end()) { diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index f0f5305a67d..1a063cfe853 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -1030,7 +1030,9 @@ grpc_cc_test( ], deps = [ "//:grpc++", + "//src/core:event_engine_tcp_socket_utils", "//src/core:experiments", + "//src/core:grpc_fake_credentials", "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//test/core/util:grpc_test_util", diff --git a/test/cpp/end2end/resource_quota_end2end_stress_test.cc b/test/cpp/end2end/resource_quota_end2end_stress_test.cc index 4a8f28deaeb..19c9a9ad6c6 100644 --- a/test/cpp/end2end/resource_quota_end2end_stress_test.cc +++ b/test/cpp/end2end/resource_quota_end2end_stress_test.cc @@ -24,17 +24,24 @@ #include "absl/strings/str_cat.h" #include "absl/time/time.h" +#include #include #include #include #include +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/experiments/config.h" #include "src/core/lib/gprpp/notification.h" +#include "src/core/lib/security/credentials/fake/fake_credentials.h" +#include "src/cpp/client/secure_credentials.h" +#include "src/cpp/server/secure_server_credentials.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" +// IWYU pragma: no_include + // A stress test which spins up a server with a small configured resource quota // value. It then creates many channels which exchange large payloads with the // server. This would drive the server to reach resource quota limits and @@ -150,9 +157,14 @@ class End2EndConnectionQuotaTest : public ::testing::TestWithParam { End2EndConnectionQuotaTest() { port_ = grpc_pick_unused_port_or_die(); server_address_ = absl::StrCat("[::]:", port_); + connect_address_ = absl::StrCat("ipv6:[::1]:", port_); payload_ = std::string(kPayloadSizeBytes, 'a'); ServerBuilder builder; - builder.AddListeningPort(server_address_, InsecureServerCredentials()); + builder.AddListeningPort( + server_address_, + std::make_shared( + grpc_fake_transport_security_server_credentials_create())); + builder.AddChannelArgument(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS, 1000); builder.AddChannelArgument(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS, GetParam()); builder.AddChannelArgument( @@ -172,13 +184,50 @@ class End2EndConnectionQuotaTest : public ::testing::TestWithParam { args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 15000); args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); - return EchoTestService::NewStub( - CreateCustomChannel(absl::StrCat("ipv6:[::1]:", port_), - grpc::InsecureChannelCredentials(), args)); + return EchoTestService::NewStub(CreateCustomChannel( + connect_address_, + std::make_shared( + grpc_fake_transport_security_credentials_create()), + args)); } void TestExceedingConnectionQuota() { const int kNumConnections = 2 * GetParam(); +#ifdef GPR_LINUX + // On linux systems create 2 * NumConnection tcp connections which don't + // do anything and verify that they get closed after + // GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS seconds. + auto connect_address_resolved = + grpc_event_engine::experimental::URIToResolvedAddress(connect_address_); + std::vector workers; + workers.reserve(kNumConnections); + for (int i = 0; i < kNumConnections; ++i) { + workers.emplace_back([connect_address_resolved]() { + int client_fd; + int one = 1; + char buf[1024]; + client_fd = socket(AF_INET6, SOCK_STREAM, 0); + setsockopt(client_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + // Connection should succeed. + EXPECT_EQ(connect(client_fd, + const_cast( + connect_address_resolved->address()), + connect_address_resolved->size()), + 0); + // recv should not block forever and it should return because + // GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS is set and the server should + // close this connections after that timeout expires. + while (recv(client_fd, buf, 1024, 0) > 0) { + } + close(client_fd); + }); + } + for (int i = 0; i < kNumConnections; ++i) { + workers[i].join(); + } +#endif + // Subsequent kNumConnections / 2 RPCs should succeed because the previously + // spawned client connections have been closed. std::vector> stubs; stubs.reserve(kNumConnections); for (int i = 0; i < kNumConnections; i++) { @@ -206,6 +255,7 @@ class End2EndConnectionQuotaTest : public ::testing::TestWithParam { int port_; std::unique_ptr server_; string server_address_; + string connect_address_; GrpcCallbackServiceImpl grpc_service_; std::string payload_; };