[chttp2-server] Handle error cases correctly when limiting number of accepted connections (#36101)

Closes #36101

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36101 from Vignesh2208:server-fix 41d6992b5c
PiperOrigin-RevId: 615480491
pull/36114/head
Vignesh Babu 10 months ago committed by Copybara-Service
parent 88585c43e9
commit 9ed686902e
  1. 7
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  2. 2
      test/cpp/end2end/BUILD
  3. 58
      test/cpp/end2end/resource_quota_end2end_stress_test.cc

@ -452,11 +452,13 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
OrphanablePtr<HandshakingState> handshaking_state_ref;
RefCountedPtr<HandshakeManager> handshake_mgr;
bool cleanup_connection = false;
bool release_connection = false;
{
MutexLock connection_lock(&self->connection_->mu_);
if (!error.ok() || self->connection_->shutdown_) {
std::string error_str = StatusToString(error);
cleanup_connection = true;
release_connection = true;
if (error.ok() && args->endpoint != nullptr) {
// We were shut down or stopped serving after handshaking completed
// successfully, so destroy the endpoint here.
@ -540,9 +542,11 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
grpc_slice_buffer_destroy(args->read_buffer);
gpr_free(args->read_buffer);
cleanup_connection = true;
release_connection = true;
}
} else {
cleanup_connection = true;
release_connection = true;
}
}
// Since the handshake manager is done, the connection no longer needs to
@ -557,6 +561,9 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
OrphanablePtr<ActiveConnection> connection;
if (cleanup_connection) {
MutexLock listener_lock(&self->connection_->listener_->mu_);
if (release_connection) {
self->connection_->listener_->connection_quota_->ReleaseConnections(1);
}
auto it = self->connection_->listener_->connections_.find(
self->connection_.get());
if (it != self->connection_->listener_->connections_.end()) {

@ -1030,7 +1030,9 @@ grpc_cc_test(
],
deps = [
"//:grpc++",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:experiments",
"//src/core:grpc_fake_credentials",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util",

@ -24,17 +24,24 @@
#include "absl/strings/str_cat.h"
#include "absl/time/time.h"
#include <grpc/grpc.h>
#include <grpc/support/time.h>
#include <grpcpp/client_context.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/support/server_callback.h>
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/experiments/config.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
#include "src/cpp/client/secure_credentials.h"
#include "src/cpp/server/secure_server_credentials.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
// IWYU pragma: no_include <sys/socket.h>
// A stress test which spins up a server with a small configured resource quota
// value. It then creates many channels which exchange large payloads with the
// server. This would drive the server to reach resource quota limits and
@ -150,9 +157,14 @@ class End2EndConnectionQuotaTest : public ::testing::TestWithParam<int> {
End2EndConnectionQuotaTest() {
port_ = grpc_pick_unused_port_or_die();
server_address_ = absl::StrCat("[::]:", port_);
connect_address_ = absl::StrCat("ipv6:[::1]:", port_);
payload_ = std::string(kPayloadSizeBytes, 'a');
ServerBuilder builder;
builder.AddListeningPort(server_address_, InsecureServerCredentials());
builder.AddListeningPort(
server_address_,
std::make_shared<SecureServerCredentials>(
grpc_fake_transport_security_server_credentials_create()));
builder.AddChannelArgument(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS, 1000);
builder.AddChannelArgument(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS,
GetParam());
builder.AddChannelArgument(
@ -172,13 +184,50 @@ class End2EndConnectionQuotaTest : public ::testing::TestWithParam<int> {
args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 15000);
args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
return EchoTestService::NewStub(
CreateCustomChannel(absl::StrCat("ipv6:[::1]:", port_),
grpc::InsecureChannelCredentials(), args));
return EchoTestService::NewStub(CreateCustomChannel(
connect_address_,
std::make_shared<SecureChannelCredentials>(
grpc_fake_transport_security_credentials_create()),
args));
}
void TestExceedingConnectionQuota() {
const int kNumConnections = 2 * GetParam();
#ifdef GPR_LINUX
// On linux systems create 2 * NumConnection tcp connections which don't
// do anything and verify that they get closed after
// GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS seconds.
auto connect_address_resolved =
grpc_event_engine::experimental::URIToResolvedAddress(connect_address_);
std::vector<std::thread> workers;
workers.reserve(kNumConnections);
for (int i = 0; i < kNumConnections; ++i) {
workers.emplace_back([connect_address_resolved]() {
int client_fd;
int one = 1;
char buf[1024];
client_fd = socket(AF_INET6, SOCK_STREAM, 0);
setsockopt(client_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
// Connection should succeed.
EXPECT_EQ(connect(client_fd,
const_cast<struct sockaddr*>(
connect_address_resolved->address()),
connect_address_resolved->size()),
0);
// recv should not block forever and it should return because
// GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS is set and the server should
// close this connections after that timeout expires.
while (recv(client_fd, buf, 1024, 0) > 0) {
}
close(client_fd);
});
}
for (int i = 0; i < kNumConnections; ++i) {
workers[i].join();
}
#endif
// Subsequent kNumConnections / 2 RPCs should succeed because the previously
// spawned client connections have been closed.
std::vector<std::unique_ptr<EchoTestService::Stub>> stubs;
stubs.reserve(kNumConnections);
for (int i = 0; i < kNumConnections; i++) {
@ -206,6 +255,7 @@ class End2EndConnectionQuotaTest : public ::testing::TestWithParam<int> {
int port_;
std::unique_ptr<Server> server_;
string server_address_;
string connect_address_;
GrpcCallbackServiceImpl grpc_service_;
std::string payload_;
};

Loading…
Cancel
Save