mirror of https://github.com/grpc/grpc.git
Reland posix event engine tcp client (#31563)
* Revert "Revert "Posix event engine tcp client (#31074)" (#31452)"
This reverts commit d760c1fef7
.
* regenerate_projects
pull/31568/head
parent
50fbe51238
commit
249645608e
28 changed files with 972 additions and 133 deletions
@ -0,0 +1,222 @@ |
|||||||
|
// Copyright 2022 gRPC Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
#include <errno.h> |
||||||
|
#include <fcntl.h> |
||||||
|
#include <poll.h> |
||||||
|
#include <stdint.h> |
||||||
|
#include <stdlib.h> |
||||||
|
#include <sys/socket.h> |
||||||
|
#include <unistd.h> |
||||||
|
|
||||||
|
#include <algorithm> |
||||||
|
#include <chrono> |
||||||
|
#include <cstring> |
||||||
|
#include <memory> |
||||||
|
#include <string> |
||||||
|
#include <utility> |
||||||
|
#include <vector> |
||||||
|
|
||||||
|
#include "absl/memory/memory.h" |
||||||
|
#include "absl/status/status.h" |
||||||
|
#include "absl/status/statusor.h" |
||||||
|
#include "absl/strings/str_cat.h" |
||||||
|
#include "gtest/gtest.h" |
||||||
|
|
||||||
|
#include <grpc/event_engine/event_engine.h> |
||||||
|
#include <grpc/grpc.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
|
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/event_engine/channel_args_endpoint_config.h" |
||||||
|
#include "src/core/lib/event_engine/posix_engine/posix_engine.h" |
||||||
|
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" |
||||||
|
#include "src/core/lib/experiments/experiments.h" |
||||||
|
#include "src/core/lib/gprpp/notification.h" |
||||||
|
#include "src/core/lib/resource_quota/memory_quota.h" |
||||||
|
#include "src/core/lib/resource_quota/resource_quota.h" |
||||||
|
#include "test/core/event_engine/test_suite/event_engine_test_utils.h" |
||||||
|
#include "test/core/util/port.h" |
||||||
|
#include "test/core/util/test_config.h" |
||||||
|
|
||||||
|
namespace grpc_event_engine { |
||||||
|
namespace posix_engine { |
||||||
|
|
||||||
|
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; |
||||||
|
using ::grpc_event_engine::experimental::EventEngine; |
||||||
|
using ::grpc_event_engine::experimental::PosixEventEngine; |
||||||
|
using ::grpc_event_engine::experimental::URIToResolvedAddress; |
||||||
|
using ::grpc_event_engine::experimental::WaitForSingleOwner; |
||||||
|
using namespace std::chrono_literals; |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
// Creates a server socket listening for one connection on a specific port. It
|
||||||
|
// then creates another client socket connected to the server socket. This fills
|
||||||
|
// up the kernel listen queue on the server socket. Any subsequent attempts to
|
||||||
|
// connect to the server socket will be pending indefinitely. This can be used
|
||||||
|
// to test Connection timeouts and cancellation attempts.
|
||||||
|
std::vector<int> CreateConnectedSockets( |
||||||
|
EventEngine::ResolvedAddress resolved_addr) { |
||||||
|
int server_socket; |
||||||
|
int opt = -1; |
||||||
|
int client_socket; |
||||||
|
int one = 1; |
||||||
|
int flags; |
||||||
|
std::vector<int> ret_sockets; |
||||||
|
// Creating a new socket file descriptor.
|
||||||
|
if ((server_socket = socket(AF_INET6, SOCK_STREAM, 0)) <= 0) { |
||||||
|
gpr_log(GPR_ERROR, "Error creating socket: %s", std::strerror(errno)); |
||||||
|
abort(); |
||||||
|
} |
||||||
|
// MacOS builds fail if SO_REUSEADDR and SO_REUSEPORT are set in the same
|
||||||
|
// setsockopt syscall. So they are set separately one after the other.
|
||||||
|
if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) { |
||||||
|
gpr_log(GPR_ERROR, "Error setsockopt(SO_REUSEADDR): %s", |
||||||
|
std::strerror(errno)); |
||||||
|
abort(); |
||||||
|
} |
||||||
|
if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) { |
||||||
|
gpr_log(GPR_ERROR, "Error setsockopt(SO_REUSEPORT): %s", |
||||||
|
std::strerror(errno)); |
||||||
|
abort(); |
||||||
|
} |
||||||
|
|
||||||
|
// Bind the new socket to server address.
|
||||||
|
if (bind(server_socket, resolved_addr.address(), resolved_addr.size()) < 0) { |
||||||
|
gpr_log(GPR_ERROR, "Error bind: %s", std::strerror(errno)); |
||||||
|
abort(); |
||||||
|
} |
||||||
|
// Set the new socket to listen for one active connection at a time.
|
||||||
|
// accept() is intentionally not called on the socket. This allows the
|
||||||
|
// connection queue to build up.
|
||||||
|
if (listen(server_socket, 1) < 0) { |
||||||
|
gpr_log(GPR_ERROR, "Error listen: %s", std::strerror(errno)); |
||||||
|
abort(); |
||||||
|
} |
||||||
|
ret_sockets.push_back(server_socket); |
||||||
|
// Create and connect client sockets until the connection attempt times out.
|
||||||
|
// Even if the backlog specified to listen is 1, the kernel continues to
|
||||||
|
// accept a certain number of SYN packets before dropping them. This loop
|
||||||
|
// attempts to identify the number of new connection attempts that will
|
||||||
|
// be allowed by the kernel before any subsequent connection attempts
|
||||||
|
// become pending indefinitely.
|
||||||
|
while (true) { |
||||||
|
client_socket = socket(AF_INET6, SOCK_STREAM, 0); |
||||||
|
setsockopt(client_socket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); |
||||||
|
// Make fd non-blocking.
|
||||||
|
flags = fcntl(client_socket, F_GETFL, 0); |
||||||
|
EXPECT_EQ(fcntl(client_socket, F_SETFL, flags | O_NONBLOCK), 0); |
||||||
|
|
||||||
|
if (connect(client_socket, |
||||||
|
const_cast<struct sockaddr*>(resolved_addr.address()), |
||||||
|
resolved_addr.size()) == -1) { |
||||||
|
if (errno == EINPROGRESS) { |
||||||
|
struct pollfd pfd; |
||||||
|
pfd.fd = client_socket; |
||||||
|
pfd.events = POLLOUT; |
||||||
|
pfd.revents = 0; |
||||||
|
int ret = poll(&pfd, 1, 1000); |
||||||
|
if (ret == -1) { |
||||||
|
gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno); |
||||||
|
abort(); |
||||||
|
} else if (ret == 0) { |
||||||
|
// current connection attempt timed out. It indicates that the
|
||||||
|
// kernel will cause any subsequent connection attempts to
|
||||||
|
// become pending indefinitely.
|
||||||
|
ret_sockets.push_back(client_socket); |
||||||
|
return ret_sockets; |
||||||
|
} |
||||||
|
} else { |
||||||
|
gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno); |
||||||
|
abort(); |
||||||
|
} |
||||||
|
} |
||||||
|
ret_sockets.push_back(client_socket); |
||||||
|
} |
||||||
|
return ret_sockets; |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) { |
||||||
|
std::string target_addr = absl::StrCat( |
||||||
|
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); |
||||||
|
auto resolved_addr = URIToResolvedAddress(target_addr); |
||||||
|
std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>(); |
||||||
|
std::string resolved_addr_str = |
||||||
|
SockaddrToString(&resolved_addr, true).value(); |
||||||
|
auto sockets = CreateConnectedSockets(resolved_addr); |
||||||
|
grpc_core::Notification signal; |
||||||
|
grpc_core::ChannelArgs args; |
||||||
|
auto quota = grpc_core::ResourceQuota::Default(); |
||||||
|
args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); |
||||||
|
ChannelArgsEndpointConfig config(args); |
||||||
|
auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar"); |
||||||
|
posix_ee->Connect( |
||||||
|
[&signal](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> status) { |
||||||
|
EXPECT_EQ(status.status().code(), absl::StatusCode::kCancelled); |
||||||
|
signal.Notify(); |
||||||
|
}, |
||||||
|
URIToResolvedAddress(target_addr), config, |
||||||
|
memory_quota->CreateMemoryAllocator("conn-1"), 3s); |
||||||
|
signal.WaitForNotification(); |
||||||
|
for (auto sock : sockets) { |
||||||
|
close(sock); |
||||||
|
} |
||||||
|
WaitForSingleOwner(std::move(posix_ee)); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) { |
||||||
|
std::string target_addr = absl::StrCat( |
||||||
|
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); |
||||||
|
auto resolved_addr = URIToResolvedAddress(target_addr); |
||||||
|
std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>(); |
||||||
|
std::string resolved_addr_str = |
||||||
|
SockaddrToString(&resolved_addr, true).value(); |
||||||
|
auto sockets = CreateConnectedSockets(resolved_addr); |
||||||
|
grpc_core::ChannelArgs args; |
||||||
|
auto quota = grpc_core::ResourceQuota::Default(); |
||||||
|
args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); |
||||||
|
ChannelArgsEndpointConfig config(args); |
||||||
|
auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar"); |
||||||
|
auto connection_handle = posix_ee->Connect( |
||||||
|
[](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> /*status*/) { |
||||||
|
FAIL() << "The on_connect callback should not have run since the " |
||||||
|
"connection attempt was cancelled."; |
||||||
|
}, |
||||||
|
URIToResolvedAddress(target_addr), config, |
||||||
|
memory_quota->CreateMemoryAllocator("conn-2"), 3s); |
||||||
|
if (connection_handle.keys[0] > 0) { |
||||||
|
ASSERT_TRUE(posix_ee->CancelConnect(connection_handle)); |
||||||
|
} |
||||||
|
for (auto sock : sockets) { |
||||||
|
close(sock); |
||||||
|
} |
||||||
|
WaitForSingleOwner(std::move(posix_ee)); |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace posix_engine
|
||||||
|
} // namespace grpc_event_engine
|
||||||
|
|
||||||
|
int main(int argc, char** argv) { |
||||||
|
grpc::testing::TestEnvironment env(&argc, argv); |
||||||
|
::testing::InitGoogleTest(&argc, argv); |
||||||
|
if (!grpc_core::IsPosixEventEngineEnablePollingEnabled()) { |
||||||
|
return 0; |
||||||
|
} |
||||||
|
grpc_init(); |
||||||
|
int ret = RUN_ALL_TESTS(); |
||||||
|
grpc_shutdown(); |
||||||
|
return ret; |
||||||
|
} |
Loading…
Reference in new issue