Add support for systemd socket activation (#31667)

* Revert "Revert "Add support for systemd socket activation (#30485)" (#31617)"

This reverts commit 867dc6cae2.

* Add checks to unix tests

* Ran generate_projects.sh and fixed styling in test

* Fix variable in unit test

* Use reinterpret_cast in test

* Rebase and fix sanity failures
pull/32037/head
Andres Beltran 2 years ago committed by GitHub
parent c952d37814
commit 2904ee8fd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      BUILD
  2. 10
      CMakeLists.txt
  3. 2
      Makefile
  4. 4
      bazel/grpc_build_system.bzl
  5. 8
      build_autogenerated.yaml
  6. 34
      cmake/modules/Findsystemd.cmake
  7. 20
      cmake/systemd.cmake
  8. 1
      config.m4
  9. 1
      config.w32
  10. 34
      examples/cpp/features/sd_sock_act/BUILD
  11. 100
      examples/cpp/features/sd_sock_act/client.cc
  12. 68
      examples/cpp/features/sd_sock_act/server.cc
  13. 73
      examples/cpp/features/sd_sock_act/test.sh
  14. 2
      gRPC-C++.podspec
  15. 3
      gRPC-Core.podspec
  16. 2
      grpc.gemspec
  17. 3
      grpc.gyp
  18. 2
      package.xml
  19. 16
      src/core/lib/iomgr/ev_epoll1_linux.cc
  20. 15
      src/core/lib/iomgr/ev_poll_posix.cc
  21. 6
      src/core/lib/iomgr/ev_posix.cc
  22. 5
      src/core/lib/iomgr/ev_posix.h
  23. 116
      src/core/lib/iomgr/systemd_utils.cc
  24. 33
      src/core/lib/iomgr/systemd_utils.h
  25. 8
      src/core/lib/iomgr/tcp_server.cc
  26. 8
      src/core/lib/iomgr/tcp_server.h
  27. 45
      src/core/lib/iomgr/tcp_server_posix.cc
  28. 3
      src/core/lib/iomgr/tcp_server_utils_posix.h
  29. 56
      src/core/lib/iomgr/tcp_server_utils_posix_common.cc
  30. 1
      src/python/grpcio/grpc_core_dependencies.py
  31. 6
      templates/CMakeLists.txt.template
  32. 223
      test/core/iomgr/tcp_server_posix_test.cc
  33. 3
      tools/distrib/fix_build_deps.py
  34. 2
      tools/doxygen/Doxyfile.c++.internal
  35. 2
      tools/doxygen/Doxyfile.core.internal

15
BUILD

@ -75,6 +75,11 @@ config_setting(
values = {"apple_platform_type": "ios"},
)
config_setting(
name = "systemd",
values = {"define": "use_systemd=true"},
)
selects.config_setting_group(
name = "grpc_no_xds",
match_any = [
@ -1212,6 +1217,7 @@ grpc_cc_library(
"//src/core:lib/iomgr/socket_utils_linux.cc",
"//src/core:lib/iomgr/socket_utils_posix.cc",
"//src/core:lib/iomgr/socket_windows.cc",
"//src/core:lib/iomgr/systemd_utils.cc",
"//src/core:lib/iomgr/tcp_client.cc",
"//src/core:lib/iomgr/tcp_client_cfstream.cc",
"//src/core:lib/iomgr/tcp_client_posix.cc",
@ -1311,6 +1317,7 @@ grpc_cc_library(
"//src/core:lib/iomgr/socket_factory_posix.h",
"//src/core:lib/iomgr/socket_utils_posix.h",
"//src/core:lib/iomgr/socket_windows.h",
"//src/core:lib/iomgr/systemd_utils.h",
"//src/core:lib/iomgr/tcp_client.h",
"//src/core:lib/iomgr/tcp_client_posix.h",
"//src/core:lib/iomgr/tcp_posix.h",
@ -1344,6 +1351,10 @@ grpc_cc_library(
"//src/core:lib/transport/transport.h",
"//src/core:lib/transport/transport_impl.h",
],
defines = select({
"systemd": ["HAVE_LIBSYSTEMD"],
"//conditions:default": [],
}),
external_deps = [
"absl/base:core_headers",
"absl/cleanup",
@ -1363,6 +1374,10 @@ grpc_cc_library(
"madler_zlib",
],
language = "c++",
linkopts = select({
"systemd": ["-lsystemd"],
"//conditions:default": [],
}),
public_hdrs = GRPC_PUBLIC_HDRS + GRPC_PUBLIC_EVENT_ENGINE_HDRS,
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [

10
CMakeLists.txt generated

@ -327,6 +327,11 @@ include(cmake/xxhash.cmake)
include(cmake/zlib.cmake)
include(cmake/download_archive.cmake)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
include(cmake/systemd.cmake)
set(_gRPC_ALLTARGETS_LIBRARIES ${_gRPC_ALLTARGETS_LIBRARIES} ${_gRPC_SYSTEMD_LIBRARIES})
endif()
# Setup external proto library at third_party/envoy-api with 2 download URLs
if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/envoy-api)
# Download the archive via HTTP, validate the checksum, and extract to third_party/envoy-api.
@ -2217,6 +2222,7 @@ add_library(grpc
src/core/lib/iomgr/socket_utils_posix.cc
src/core/lib/iomgr/socket_utils_windows.cc
src/core/lib/iomgr/socket_windows.cc
src/core/lib/iomgr/systemd_utils.cc
src/core/lib/iomgr/tcp_client.cc
src/core/lib/iomgr/tcp_client_cfstream.cc
src/core/lib/iomgr/tcp_client_posix.cc
@ -2885,6 +2891,7 @@ add_library(grpc_unsecure
src/core/lib/iomgr/socket_utils_posix.cc
src/core/lib/iomgr/socket_utils_windows.cc
src/core/lib/iomgr/socket_windows.cc
src/core/lib/iomgr/systemd_utils.cc
src/core/lib/iomgr/tcp_client.cc
src/core/lib/iomgr/tcp_client_cfstream.cc
src/core/lib/iomgr/tcp_client_posix.cc
@ -4368,6 +4375,7 @@ add_library(grpc_authorization_provider
src/core/lib/iomgr/socket_utils_posix.cc
src/core/lib/iomgr/socket_utils_windows.cc
src/core/lib/iomgr/socket_windows.cc
src/core/lib/iomgr/systemd_utils.cc
src/core/lib/iomgr/tcp_client.cc
src/core/lib/iomgr/tcp_client_cfstream.cc
src/core/lib/iomgr/tcp_client_posix.cc
@ -11330,6 +11338,7 @@ add_executable(frame_test
src/core/lib/iomgr/socket_utils_posix.cc
src/core/lib/iomgr/socket_utils_windows.cc
src/core/lib/iomgr/socket_windows.cc
src/core/lib/iomgr/systemd_utils.cc
src/core/lib/iomgr/tcp_client.cc
src/core/lib/iomgr/tcp_client_cfstream.cc
src/core/lib/iomgr/tcp_client_posix.cc
@ -25120,6 +25129,7 @@ install(FILES
install(FILES
${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules/Findc-ares.cmake
${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules/Findre2.cmake
${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules/Findsystemd.cmake
DESTINATION ${gRPC_INSTALL_CMAKEDIR}/modules
)

2
Makefile generated

@ -1492,6 +1492,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/socket_utils_posix.cc \
src/core/lib/iomgr/socket_utils_windows.cc \
src/core/lib/iomgr/socket_windows.cc \
src/core/lib/iomgr/systemd_utils.cc \
src/core/lib/iomgr/tcp_client.cc \
src/core/lib/iomgr/tcp_client_cfstream.cc \
src/core/lib/iomgr/tcp_client_posix.cc \
@ -2019,6 +2020,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/socket_utils_posix.cc \
src/core/lib/iomgr/socket_utils_windows.cc \
src/core/lib/iomgr/socket_windows.cc \
src/core/lib/iomgr/systemd_utils.cc \
src/core/lib/iomgr/tcp_client.cc \
src/core/lib/iomgr/tcp_client_cfstream.cc \
src/core/lib/iomgr/tcp_client_posix.cc \

@ -134,6 +134,7 @@ def grpc_cc_library(
alwayslink = 0,
data = [],
tags = [],
linkopts = [],
linkstatic = False):
"""An internal wrapper around cc_library.
@ -153,13 +154,14 @@ def grpc_cc_library(
alwayslink: Whether to enable alwayslink on the cc_library.
data: Data dependencies.
tags: Tags to apply to the rule.
linkopts: Extra libraries to link.
linkstatic: Whether to enable linkstatic on the cc_library.
"""
visibility = _update_visibility(visibility)
copts = []
if language.upper() == "C":
copts = copts + if_not_windows(["-std=c11"])
linkopts = if_not_windows(["-pthread"]) + if_windows(["-defaultlib:ws2_32.lib"])
linkopts = linkopts + if_not_windows(["-pthread"]) + if_windows(["-defaultlib:ws2_32.lib"])
if select_deps:
for select_deps_entry in select_deps:
deps += select(select_deps_entry)

@ -869,6 +869,7 @@ libs:
- src/core/lib/iomgr/socket_utils.h
- src/core/lib/iomgr/socket_utils_posix.h
- src/core/lib/iomgr/socket_windows.h
- src/core/lib/iomgr/systemd_utils.h
- src/core/lib/iomgr/tcp_client.h
- src/core/lib/iomgr/tcp_client_posix.h
- src/core/lib/iomgr/tcp_posix.h
@ -1593,6 +1594,7 @@ libs:
- src/core/lib/iomgr/socket_utils_posix.cc
- src/core/lib/iomgr/socket_utils_windows.cc
- src/core/lib/iomgr/socket_windows.cc
- src/core/lib/iomgr/systemd_utils.cc
- src/core/lib/iomgr/tcp_client.cc
- src/core/lib/iomgr/tcp_client_cfstream.cc
- src/core/lib/iomgr/tcp_client_posix.cc
@ -2159,6 +2161,7 @@ libs:
- src/core/lib/iomgr/socket_utils.h
- src/core/lib/iomgr/socket_utils_posix.h
- src/core/lib/iomgr/socket_windows.h
- src/core/lib/iomgr/systemd_utils.h
- src/core/lib/iomgr/tcp_client.h
- src/core/lib/iomgr/tcp_client_posix.h
- src/core/lib/iomgr/tcp_posix.h
@ -2519,6 +2522,7 @@ libs:
- src/core/lib/iomgr/socket_utils_posix.cc
- src/core/lib/iomgr/socket_utils_windows.cc
- src/core/lib/iomgr/socket_windows.cc
- src/core/lib/iomgr/systemd_utils.cc
- src/core/lib/iomgr/tcp_client.cc
- src/core/lib/iomgr/tcp_client_cfstream.cc
- src/core/lib/iomgr/tcp_client_posix.cc
@ -3590,6 +3594,7 @@ libs:
- src/core/lib/iomgr/socket_utils.h
- src/core/lib/iomgr/socket_utils_posix.h
- src/core/lib/iomgr/socket_windows.h
- src/core/lib/iomgr/systemd_utils.h
- src/core/lib/iomgr/tcp_client.h
- src/core/lib/iomgr/tcp_client_posix.h
- src/core/lib/iomgr/tcp_posix.h
@ -3831,6 +3836,7 @@ libs:
- src/core/lib/iomgr/socket_utils_posix.cc
- src/core/lib/iomgr/socket_utils_windows.cc
- src/core/lib/iomgr/socket_windows.cc
- src/core/lib/iomgr/systemd_utils.cc
- src/core/lib/iomgr/tcp_client.cc
- src/core/lib/iomgr/tcp_client_cfstream.cc
- src/core/lib/iomgr/tcp_client_posix.cc
@ -7304,6 +7310,7 @@ targets:
- src/core/lib/iomgr/socket_utils.h
- src/core/lib/iomgr/socket_utils_posix.h
- src/core/lib/iomgr/socket_windows.h
- src/core/lib/iomgr/systemd_utils.h
- src/core/lib/iomgr/tcp_client.h
- src/core/lib/iomgr/tcp_client_posix.h
- src/core/lib/iomgr/tcp_posix.h
@ -7527,6 +7534,7 @@ targets:
- src/core/lib/iomgr/socket_utils_posix.cc
- src/core/lib/iomgr/socket_utils_windows.cc
- src/core/lib/iomgr/socket_windows.cc
- src/core/lib/iomgr/systemd_utils.cc
- src/core/lib/iomgr/tcp_client.cc
- src/core/lib/iomgr/tcp_client_cfstream.cc
- src/core/lib/iomgr/tcp_client_posix.cc

@ -0,0 +1,34 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
find_package(systemd QUIET CONFIG)
if(systemd_FOUND)
message(STATUS "Found systemd via CMake.")
return()
endif()
if(TARGET systemd)
message(STATUS "Found systemd via pkg-config already?")
return()
endif()
find_package(PkgConfig)
pkg_check_modules(SYSTEMD libsystemd)
if(SYSTEMD_FOUND)
set(systemd_FOUND "${SYSTEMD_FOUND}")
add_library(systemd INTERFACE IMPORTED)
message(STATUS "Found systemd via pkg-config.")
endif()

@ -0,0 +1,20 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
find_package(systemd)
if(TARGET systemd)
set(_gRPC_SYSTEMD_LIBRARIES systemd ${SYSTEMD_LINK_LIBRARIES})
add_definitions(-DHAVE_LIBSYSTEMD)
endif()
set(_gRPC_FIND_SYSTEMD "if(NOT systemd_FOUND)\n find_package(systemd)\nendif()")

1
config.m4 generated

@ -617,6 +617,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/socket_utils_posix.cc \
src/core/lib/iomgr/socket_utils_windows.cc \
src/core/lib/iomgr/socket_windows.cc \
src/core/lib/iomgr/systemd_utils.cc \
src/core/lib/iomgr/tcp_client.cc \
src/core/lib/iomgr/tcp_client_cfstream.cc \
src/core/lib/iomgr/tcp_client_posix.cc \

1
config.w32 generated

@ -583,6 +583,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\iomgr\\socket_utils_posix.cc " +
"src\\core\\lib\\iomgr\\socket_utils_windows.cc " +
"src\\core\\lib\\iomgr\\socket_windows.cc " +
"src\\core\\lib\\iomgr\\systemd_utils.cc " +
"src\\core\\lib\\iomgr\\tcp_client.cc " +
"src\\core\\lib\\iomgr\\tcp_client_cfstream.cc " +
"src\\core\\lib\\iomgr\\tcp_client_posix.cc " +

@ -0,0 +1,34 @@
# Copyright 2022 the gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
licenses(["notice"])
cc_binary(
name = "client",
srcs = ["client.cc"],
deps = [
"//:grpc++",
"//examples/protos:helloworld_cc_grpc",
],
)
cc_binary(
name = "server",
srcs = ["server.cc"],
deps = [
"//:grpc++",
"//:grpc++_reflection",
"//examples/protos:helloworld_cc_grpc",
],
)

@ -0,0 +1,100 @@
// Copyright 2022 the gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iostream>
#include <memory>
#include <string>
#include "examples/protos/helloworld.grpc.pb.h"
#include <grpcpp/grpcpp.h>
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;
class GreeterClient {
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
// Assembles the client's payload, sends it and presents the response back
// from the server.
std::string SayHello(const std::string& user) {
// Data we are sending to the server.
HelloRequest request;
request.set_name(user);
// Container for the data we expect from the server.
HelloReply reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// The actual RPC.
Status status = stub_->SayHello(&context, request, &reply);
// Act upon its status.
if (status.ok()) {
return reply.message();
} else {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
return "RPC failed";
}
}
private:
std::unique_ptr<Greeter::Stub> stub_;
};
int main(int argc, char** argv) {
// Instantiate the client. It requires a channel, out of which the actual RPCs
// are created. This channel models a connection to an endpoint specified by
// the argument "--target=" which is the only expected argument.
// We indicate that the channel isn't authenticated (use of
// InsecureChannelCredentials()).
std::string target_str;
std::string arg_str("--target");
if (argc > 1) {
std::string arg_val = argv[1];
size_t start_pos = arg_val.find(arg_str);
if (start_pos != std::string::npos) {
start_pos += arg_str.size();
if (arg_val[start_pos] == '=') {
target_str = arg_val.substr(start_pos + 1);
} else {
std::cout << "The only correct argument syntax is --target="
<< std::endl;
return 0;
}
} else {
std::cout << "The only acceptable argument is --target=" << std::endl;
return 0;
}
} else {
target_str = "unix:/tmp/server";
}
GreeterClient greeter(
grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()));
std::string user("world");
std::string reply(greeter.SayHello(user));
std::cout << "Greeter received: " << reply << std::endl;
return 0;
}

@ -0,0 +1,68 @@
// Copyright 2022 the gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iostream>
#include <memory>
#include <string>
#include "examples/protos/helloworld.grpc.pb.h"
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;
// Logic and data behind the server's behavior.
class GreeterServiceImpl final : public Greeter::Service {
Status SayHello(ServerContext* context, const HelloRequest* request,
HelloReply* reply) override {
std::string prefix("Hello ");
reply->set_message(prefix + request->name());
return Status::OK;
}
};
void RunServer() {
std::string server_address("unix:/tmp/server");
GreeterServiceImpl service;
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
builder.RegisterService(&service);
// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.
server->Wait();
}
int main(int argc, char** argv) {
RunServer();
return 0;
}

@ -0,0 +1,73 @@
#!/usr/bin/env bash
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Test structure borrowed with gratitude from
# https://github.com/grpc/grpc-go/tree/master/examples/features
# Run this script as root
clean() {
echo "Cleaning..."
systemctl stop sdsockact.socket
systemctl stop sdsockact.service
systemctl daemon-reload
rm /tmp/greeter_server
rm /tmp/greeter_client
rm /etc/systemd/system/sdsockact.service
rm /etc/systemd/system/sdsockact.socket
}
fail() {
clean
echo "FAIL: $@" >&2
exit 1
}
pass() {
echo "SUCCESS: $1"
}
bazel build --define=use_systemd=true //examples/cpp/features/sd_sock_act:all || fail "Failed to build sd_sock_act"
cp ../../../../bazel-bin/examples/cpp/features/sd_sock_act/server /tmp/greeter_server
cp ../../../../bazel-bin/examples/cpp/features/sd_sock_act/client /tmp/greeter_client
cat << EOF > /etc/systemd/system/sdsockact.service
[Service]
ExecStart=/tmp/greeter_server
EOF
cat << EOF > /etc/systemd/system/sdsockact.socket
[Socket]
ListenStream=/tmp/server
ReusePort=true
[Install]
WantedBy=sockets.target
EOF
systemctl daemon-reload
systemctl enable sdsockact.socket
systemctl start sdsockact.socket
pushd /tmp
./greeter_client | grep "Hello"
if [ $? -ne 0 ]; then
popd
fail "Response not received"
fi
popd
pass "Response received"
clean

2
gRPC-C++.podspec generated

@ -845,6 +845,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/socket_utils.h',
'src/core/lib/iomgr/socket_utils_posix.h',
'src/core/lib/iomgr/socket_windows.h',
'src/core/lib/iomgr/systemd_utils.h',
'src/core/lib/iomgr/tcp_client.h',
'src/core/lib/iomgr/tcp_client_posix.h',
'src/core/lib/iomgr/tcp_posix.h',
@ -1738,6 +1739,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/socket_utils.h',
'src/core/lib/iomgr/socket_utils_posix.h',
'src/core/lib/iomgr/socket_windows.h',
'src/core/lib/iomgr/systemd_utils.h',
'src/core/lib/iomgr/tcp_client.h',
'src/core/lib/iomgr/tcp_client_posix.h',
'src/core/lib/iomgr/tcp_posix.h',

3
gRPC-Core.podspec generated

@ -1352,6 +1352,8 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/socket_utils_windows.cc',
'src/core/lib/iomgr/socket_windows.cc',
'src/core/lib/iomgr/socket_windows.h',
'src/core/lib/iomgr/systemd_utils.cc',
'src/core/lib/iomgr/systemd_utils.h',
'src/core/lib/iomgr/tcp_client.cc',
'src/core/lib/iomgr/tcp_client.h',
'src/core/lib/iomgr/tcp_client_cfstream.cc',
@ -2389,6 +2391,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/socket_utils.h',
'src/core/lib/iomgr/socket_utils_posix.h',
'src/core/lib/iomgr/socket_windows.h',
'src/core/lib/iomgr/systemd_utils.h',
'src/core/lib/iomgr/tcp_client.h',
'src/core/lib/iomgr/tcp_client_posix.h',
'src/core/lib/iomgr/tcp_posix.h',

2
grpc.gemspec generated

@ -1263,6 +1263,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/socket_utils_windows.cc )
s.files += %w( src/core/lib/iomgr/socket_windows.cc )
s.files += %w( src/core/lib/iomgr/socket_windows.h )
s.files += %w( src/core/lib/iomgr/systemd_utils.cc )
s.files += %w( src/core/lib/iomgr/systemd_utils.h )
s.files += %w( src/core/lib/iomgr/tcp_client.cc )
s.files += %w( src/core/lib/iomgr/tcp_client.h )
s.files += %w( src/core/lib/iomgr/tcp_client_cfstream.cc )

3
grpc.gyp generated

@ -905,6 +905,7 @@
'src/core/lib/iomgr/socket_utils_posix.cc',
'src/core/lib/iomgr/socket_utils_windows.cc',
'src/core/lib/iomgr/socket_windows.cc',
'src/core/lib/iomgr/systemd_utils.cc',
'src/core/lib/iomgr/tcp_client.cc',
'src/core/lib/iomgr/tcp_client_cfstream.cc',
'src/core/lib/iomgr/tcp_client_posix.cc',
@ -1374,6 +1375,7 @@
'src/core/lib/iomgr/socket_utils_posix.cc',
'src/core/lib/iomgr/socket_utils_windows.cc',
'src/core/lib/iomgr/socket_windows.cc',
'src/core/lib/iomgr/systemd_utils.cc',
'src/core/lib/iomgr/tcp_client.cc',
'src/core/lib/iomgr/tcp_client_cfstream.cc',
'src/core/lib/iomgr/tcp_client_posix.cc',
@ -1868,6 +1870,7 @@
'src/core/lib/iomgr/socket_utils_posix.cc',
'src/core/lib/iomgr/socket_utils_windows.cc',
'src/core/lib/iomgr/socket_windows.cc',
'src/core/lib/iomgr/systemd_utils.cc',
'src/core/lib/iomgr/tcp_client.cc',
'src/core/lib/iomgr/tcp_client_cfstream.cc',
'src/core/lib/iomgr/tcp_client_posix.cc',

2
package.xml generated

@ -1245,6 +1245,8 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/socket_utils_windows.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/socket_windows.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/socket_windows.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/systemd_utils.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/systemd_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/tcp_client.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/tcp_client.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/tcp_client_cfstream.cc" role="src" />

@ -156,6 +156,8 @@ struct grpc_fd {
// Only used when GRPC_ENABLE_FORK_SUPPORT=1
grpc_fork_fd_list* fork_fd_list;
bool is_pre_allocated;
};
static void fd_global_init(void);
@ -350,6 +352,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
new_fd->error_closure->InitEvent();
new_fd->freelist_next = nullptr;
new_fd->is_pre_allocated = false;
std::string fd_name = absl::StrCat(name, " fd=", fd);
grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name.c_str());
@ -386,7 +389,9 @@ static void fd_shutdown_internal(grpc_fd* fd, grpc_error_handle why,
bool releasing_fd) {
if (fd->read_closure->SetShutdown(why)) {
if (!releasing_fd) {
shutdown(fd->fd, SHUT_RDWR);
if (!fd->is_pre_allocated) {
shutdown(fd->fd, SHUT_RDWR);
}
} else {
// we need a phony event for earlier linux versions.
epoll_event phony_event;
@ -420,7 +425,9 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
if (is_release_fd) {
*release_fd = fd->fd;
} else {
close(fd->fd);
if (!fd->is_pre_allocated) {
close(fd->fd);
}
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
@ -459,6 +466,8 @@ static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
static void fd_set_pre_allocated(grpc_fd* fd) { fd->is_pre_allocated = true; }
//******************************************************************************
// Pollset Definitions
//
@ -1273,6 +1282,8 @@ const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
shutdown_background_closure,
/* shutdown_engine = */ []() {},
add_closure_to_background_poller,
fd_set_pre_allocated,
};
// Called by the child process's post-fork handler to close open fds, including
@ -1362,6 +1373,7 @@ const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
nullptr,
nullptr,
nullptr,
nullptr,
};
#endif // defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
#endif // !defined(GRPC_LINUX_EPOLL)

@ -125,6 +125,8 @@ struct grpc_fd {
// Only used when GRPC_ENABLE_FORK_SUPPORT=1
grpc_fork_fd_list* fork_fd_list;
bool is_pre_allocated;
};
// True when GRPC_ENABLE_FORK_SUPPORT=1.
@ -384,6 +386,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
r->on_done_closure = nullptr;
r->closed = 0;
r->released = 0;
r->is_pre_allocated = false;
gpr_atm_no_barrier_store(&r->pollhup, 0);
std::string name2 = absl::StrCat(name, " fd=", fd);
@ -438,7 +441,9 @@ static int has_watchers(grpc_fd* fd) {
static void close_fd_locked(grpc_fd* fd) {
fd->closed = 1;
if (!fd->released) {
close(fd->fd);
if (!fd->is_pre_allocated) {
close(fd->fd);
}
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure,
absl::OkStatus());
@ -547,7 +552,9 @@ static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
fd->shutdown = 1;
fd->shutdown_error = why;
// signal read/write closed to OS so that future operations fail
shutdown(fd->fd, SHUT_RDWR);
if (!fd->is_pre_allocated) {
shutdown(fd->fd, SHUT_RDWR);
}
set_ready_locked(fd, &fd->read_closure);
set_ready_locked(fd, &fd->write_closure);
}
@ -706,6 +713,8 @@ static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
GRPC_FD_UNREF(fd, "poll");
}
static void fd_set_pre_allocated(grpc_fd* fd) { fd->is_pre_allocated = true; }
//******************************************************************************
// pollset_posix.c
//
@ -1405,6 +1414,8 @@ const grpc_event_engine_vtable grpc_ev_poll_posix = {
/* shutdown_engine = */ shutdown_background_closure,
[]() {},
add_closure_to_background_poller,
fd_set_pre_allocated,
};
namespace {

@ -227,6 +227,12 @@ void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
g_event_engine->fd_orphan(fd, on_done, release_fd, reason);
}
void grpc_fd_set_pre_allocated(grpc_fd* fd) {
GRPC_POLLING_API_TRACE("fd_set_pre_allocated(%d)", grpc_fd_wrapped_fd(fd));
GRPC_FD_TRACE("fd_set_pre_allocated(%d)", grpc_fd_wrapped_fd(fd));
g_event_engine->fd_set_pre_allocated(fd);
}
void grpc_fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
GRPC_POLLING_API_TRACE("fd_shutdown(%d)", grpc_fd_wrapped_fd(fd));
GRPC_FD_TRACE("fd_shutdown(%d)", grpc_fd_wrapped_fd(fd));

@ -91,6 +91,8 @@ typedef struct grpc_event_engine_vtable {
void (*shutdown_engine)(void);
bool (*add_closure_to_background_poller)(grpc_closure* closure,
grpc_error_handle error);
void (*fd_set_pre_allocated)(grpc_fd* fd);
} grpc_event_engine_vtable;
// register a new event engine factory
@ -179,6 +181,9 @@ void grpc_fd_set_writable(grpc_fd* fd);
//
void grpc_fd_set_error(grpc_fd* fd);
// Set the fd to be preallocated
void grpc_fd_set_pre_allocated(grpc_fd* fd);
// pollset_posix functions
// Add an fd to a pollset

@ -0,0 +1,116 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
#ifdef HAVE_LIBSYSTEMD
#include <systemd/sd-daemon.h>
#endif
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/systemd_utils.h"
#ifdef HAVE_LIBSYSTEMD
bool set_matching_sd_unix_fd(grpc_tcp_server* s,
const grpc_resolved_address* addr,
const int fd_start, const int n) {
absl::StatusOr<std::string> addr_name = grpc_sockaddr_to_string(addr, true);
for (int i = fd_start; i < fd_start + n; i++) {
if (sd_is_socket_unix(i, SOCK_STREAM, 1, addr_name.value().c_str(), 0)) {
grpc_tcp_server_set_pre_allocated_fd(s, i);
return true;
}
}
return false;
}
bool set_matching_sd_inet_fd(grpc_tcp_server* s,
const grpc_resolved_address* addr,
const int family, const int port,
const int fd_start, const int n) {
for (int i = fd_start; i < fd_start + n; i++) {
int r_inet = sd_is_socket_inet(i, family, SOCK_STREAM, 1, (uint16_t)port);
int r_addr = sd_is_socket_sockaddr(
i, SOCK_STREAM,
reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)),
addr->len, 1);
if (r_inet > 0 && r_addr > 0) {
grpc_tcp_server_set_pre_allocated_fd(s, i);
return true;
}
}
return false;
}
void set_matching_sd_fds(grpc_tcp_server* s, const grpc_resolved_address* addr,
int requested_port) {
int n = sd_listen_fds(0);
if (n <= 0) {
return;
}
int fd_start = SD_LISTEN_FDS_START;
grpc_resolved_address addr6_v4mapped;
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = &addr6_v4mapped;
}
int family = grpc_sockaddr_get_family(addr);
int port = grpc_sockaddr_get_port(addr);
if (family == AF_UNIX) {
set_matching_sd_unix_fd(s, addr, fd_start, n);
} else {
if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
grpc_resolved_address wild4;
grpc_resolved_address wild6;
grpc_resolved_address wildcard_addrs[2];
grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
wildcard_addrs[0] = wild4;
wildcard_addrs[1] = wild6;
for (grpc_resolved_address addr_w : wildcard_addrs) {
int family_w = grpc_sockaddr_get_family(&addr_w);
int port_w = grpc_sockaddr_get_port(&addr_w);
if (set_matching_sd_inet_fd(s, &addr_w, family_w, port_w, fd_start,
n)) {
return;
}
}
return;
}
set_matching_sd_inet_fd(s, addr, family, port, fd_start, n);
}
}
#else
void set_matching_sd_fds(GRPC_UNUSED grpc_tcp_server* s,
GRPC_UNUSED const grpc_resolved_address* addr,
GRPC_UNUSED int requested_port) {}
#endif /* HAVE_LIBSYSTEMD */
#endif /* GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON */

@ -0,0 +1,33 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_SYSTEMD_UTILS_H
#define GRPC_CORE_LIB_IOMGR_SYSTEMD_UTILS_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
/* Check whether systemd has pre-allocated FDs. If so, check whether any
* pre-allocated FD is valid, i.e. matches addr and its family. If there is
* any valid FD, set its value to s->pre_allocated_fd
*/
void set_matching_sd_fds(grpc_tcp_server* s, const grpc_resolved_address* addr,
int requested_port);
#endif /* GRPC_CORE_LIB_IOMGR_SYSTEMD_UTILS_H */

@ -73,6 +73,14 @@ void grpc_tcp_server_shutdown_listeners(grpc_tcp_server* s) {
grpc_tcp_server_impl->shutdown_listeners(s);
}
int grpc_tcp_server_pre_allocated_fd(grpc_tcp_server* s) {
return grpc_tcp_server_impl->pre_allocated_fd(s);
}
void grpc_tcp_server_set_pre_allocated_fd(grpc_tcp_server* s, int fd) {
grpc_tcp_server_impl->set_pre_allocated_fd(s, fd);
}
void grpc_set_tcp_server_impl(grpc_tcp_server_vtable* impl) {
grpc_tcp_server_impl = impl;
}

@ -81,6 +81,8 @@ typedef struct grpc_tcp_server_vtable {
grpc_closure* shutdown_starting);
void (*unref)(grpc_tcp_server* s);
void (*shutdown_listeners)(grpc_tcp_server* s);
int (*pre_allocated_fd)(grpc_tcp_server* s);
void (*set_pre_allocated_fd)(grpc_tcp_server* s, int fd);
} grpc_tcp_server_vtable;
// Create a server, initially not bound to any ports. The caller owns one ref.
@ -141,6 +143,12 @@ void grpc_tcp_server_unref(grpc_tcp_server* s);
// Shutdown the fds of listeners.
void grpc_tcp_server_shutdown_listeners(grpc_tcp_server* s);
/* Get pre-allocated FD for server. -1 if none is set */
int grpc_tcp_server_pre_allocated_fd(grpc_tcp_server* s);
/* Set pre-allocated FD for server */
void grpc_tcp_server_set_pre_allocated_fd(grpc_tcp_server* s, int fd);
void grpc_tcp_server_global_init();
void grpc_set_tcp_server_impl(grpc_tcp_server_vtable* impl);

@ -57,6 +57,7 @@
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/systemd_utils.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
@ -101,6 +102,7 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
GPR_ASSERT(s->options.resource_quota != nullptr);
GPR_ASSERT(s->on_accept_cb);
s->memory_quota = s->options.resource_quota->memory_quota();
s->pre_allocated_fd = -1;
gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
*server = s;
return absl::OkStatus();
@ -149,7 +151,10 @@ static void deactivated_all_ports(grpc_tcp_server* s) {
if (s->head) {
grpc_tcp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
// Do not unlink if there is a pre-allocated FD
if (grpc_tcp_server_pre_allocated_fd(s) <= 0) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
}
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
@ -429,7 +434,6 @@ static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
if (s->tail != nullptr) {
port_index = s->tail->port_index + 1;
}
grpc_unlink_if_unix_domain_socket(addr);
// Check if this is a wildcard port, and if so, try to keep the port the same
// as some previously created listener.
@ -452,6 +456,16 @@ static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
}
}
}
/* Check if systemd has pre-allocated valid FDs */
set_matching_sd_fds(s, addr, requested_port);
/* Do not unlink if there are pre-allocated FDs, or it will stop
working after the first client connects */
if (grpc_tcp_server_pre_allocated_fd(s) <= 0) {
grpc_unlink_if_unix_domain_socket(addr);
}
if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
return add_wildcard_addrs_to_server(s, port_index, requested_port,
out_port);
@ -579,6 +593,16 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
gpr_mu_unlock(&s->mu);
}
static int tcp_server_pre_allocated_fd(grpc_tcp_server* s) {
return s->pre_allocated_fd;
}
static void tcp_server_set_pre_allocated_fd(grpc_tcp_server* s, int fd) {
gpr_mu_lock(&s->mu);
s->pre_allocated_fd = fd;
gpr_mu_unlock(&s->mu);
}
namespace {
class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
public:
@ -642,10 +666,17 @@ static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
}
grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
tcp_server_create, tcp_server_start,
tcp_server_add_port, tcp_server_create_fd_handler,
tcp_server_port_fd_count, tcp_server_port_fd,
tcp_server_ref, tcp_server_shutdown_starting_add,
tcp_server_unref, tcp_server_shutdown_listeners};
tcp_server_create,
tcp_server_start,
tcp_server_add_port,
tcp_server_create_fd_handler,
tcp_server_port_fd_count,
tcp_server_port_fd,
tcp_server_ref,
tcp_server_shutdown_starting_add,
tcp_server_unref,
tcp_server_shutdown_listeners,
tcp_server_pre_allocated_fd,
tcp_server_set_pre_allocated_fd};
#endif // GRPC_POSIX_SOCKET_TCP_SERVER

@ -98,6 +98,9 @@ struct grpc_tcp_server {
// used to create slice allocators for endpoints, owned
grpc_core::MemoryQuotaRefPtr memory_quota;
/* used to store a pre-allocated FD assigned to a socket */
int pre_allocated_fd;
};
// If successful, add a listener to \a s for \a addr, set \a dsmode for the

@ -111,6 +111,12 @@ static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name.c_str(), true);
// Check and set fd as prellocated
if (grpc_tcp_server_pre_allocated_fd(s) == fd) {
grpc_fd_set_pre_allocated(sp->emfd);
}
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = port_index;
@ -132,8 +138,36 @@ grpc_error_handle grpc_tcp_server_add_addr(grpc_tcp_server* s,
unsigned fd_index,
grpc_dualstack_mode* dsmode,
grpc_tcp_listener** listener) {
grpc_resolved_address addr4_copy;
int fd;
fd = grpc_tcp_server_pre_allocated_fd(s);
// Check if FD has been pre-allocated
if (fd > 0) {
int family = grpc_sockaddr_get_family(addr);
// Set dsmode value
if (family == AF_INET6) {
const int off = 0;
if (setsockopt(fd, 0, IPV6_V6ONLY, &off, sizeof(off)) == 0) {
*dsmode = GRPC_DSMODE_DUALSTACK;
} else if (!grpc_sockaddr_is_v4mapped(addr, nullptr)) {
*dsmode = GRPC_DSMODE_IPV6;
} else {
*dsmode = GRPC_DSMODE_IPV4;
}
} else {
*dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
}
grpc_resolved_address addr4_copy;
if (*dsmode == GRPC_DSMODE_IPV4 &&
grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
addr = &addr4_copy;
}
return add_socket_to_server(s, fd, addr, port_index, fd_index, listener);
}
grpc_resolved_address addr4_copy;
grpc_error_handle err =
grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd);
if (!err.ok()) {
@ -187,15 +221,19 @@ grpc_error_handle grpc_tcp_server_prepare_socket(
s->options);
if (!err.ok()) goto error;
if (bind(fd, reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)),
addr->len) < 0) {
err = GRPC_OS_ERROR(errno, "bind");
goto error;
}
// Only bind/listen if fd has not been already preallocated
if (grpc_tcp_server_pre_allocated_fd(s) != fd) {
if (bind(fd,
reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)),
addr->len) < 0) {
err = GRPC_OS_ERROR(errno, "bind");
goto error;
}
if (listen(fd, get_max_accept_queue_size()) < 0) {
err = GRPC_OS_ERROR(errno, "listen");
goto error;
if (listen(fd, get_max_accept_queue_size()) < 0) {
err = GRPC_OS_ERROR(errno, "listen");
goto error;
}
}
sockname_temp.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));

@ -592,6 +592,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/socket_utils_posix.cc',
'src/core/lib/iomgr/socket_utils_windows.cc',
'src/core/lib/iomgr/socket_windows.cc',
'src/core/lib/iomgr/systemd_utils.cc',
'src/core/lib/iomgr/tcp_client.cc',
'src/core/lib/iomgr/tcp_client_cfstream.cc',
'src/core/lib/iomgr/tcp_client_posix.cc',

@ -370,6 +370,11 @@
include(cmake/zlib.cmake)
include(cmake/download_archive.cmake)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
include(cmake/systemd.cmake)
set(_gRPC_ALLTARGETS_LIBRARIES <%text>${_gRPC_ALLTARGETS_LIBRARIES}</%text> <%text>${_gRPC_SYSTEMD_LIBRARIES}</%text>)
endif()
% for external_proto_library in external_proto_libraries:
% if len(external_proto_library.urls) > 1:
# Setup external proto library at ${external_proto_library.destination} with ${len(external_proto_library.urls)} download URLs
@ -840,6 +845,7 @@
install(FILES
<%text>${CMAKE_CURRENT_SOURCE_DIR}</%text>/cmake/modules/Findc-ares.cmake
<%text>${CMAKE_CURRENT_SOURCE_DIR}</%text>/cmake/modules/Findre2.cmake
<%text>${CMAKE_CURRENT_SOURCE_DIR}</%text>/cmake/modules/Findsystemd.cmake
DESTINATION <%text>${gRPC_INSTALL_CMAKEDIR}</%text>/modules
)

@ -34,6 +34,10 @@
#include <sys/types.h>
#include <unistd.h>
#ifdef GRPC_HAVE_UNIX_SOCKET
#include <sys/un.h>
#endif
#include <string>
#include <grpc/grpc.h>
@ -467,6 +471,221 @@ static void test_connect(size_t num_connects,
ASSERT_EQ(weak_ref.server, nullptr);
}
static int pre_allocate_inet_sock(grpc_tcp_server* s, int family, int port,
int* fd) {
struct sockaddr_in6 address;
memset(&address, 0, sizeof(address));
address.sin6_family = family;
address.sin6_port = htons(port);
int pre_fd = socket(address.sin6_family, SOCK_STREAM, 0);
if (pre_fd < 0) {
gpr_log(GPR_ERROR, "Unable to create inet socket: %m");
return -1;
}
const int enable = 1;
setsockopt(pre_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
int b = bind(pre_fd, reinterpret_cast<struct sockaddr*>(&address),
sizeof(address));
if (b < 0) {
gpr_log(GPR_ERROR, "Unable to bind inet socket: %m");
return -1;
}
int l = listen(pre_fd, SOMAXCONN);
if (l < 0) {
gpr_log(GPR_ERROR, "Unable to listen on inet socket: %m");
return -1;
}
grpc_tcp_server_set_pre_allocated_fd(s, pre_fd);
*fd = pre_fd;
return 0;
}
static void test_pre_allocated_inet_fd() {
grpc_core::ExecCtx exec_ctx;
grpc_resolved_address resolved_addr;
struct sockaddr_in6* addr =
reinterpret_cast<struct sockaddr_in6*>(resolved_addr.addr);
grpc_tcp_server* s;
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
ASSERT_EQ(
absl::OkStatus(),
grpc_tcp_server_create(
nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
on_connect, nullptr, &s));
LOG_TEST("test_pre_allocated_inet_fd");
// Pre allocate FD
int pre_fd;
int port = grpc_pick_unused_port_or_die();
int res_pre = pre_allocate_inet_sock(s, AF_INET6, port, &pre_fd);
if (res_pre < 0) {
grpc_tcp_server_unref(s);
close(pre_fd);
return;
}
ASSERT_EQ(grpc_tcp_server_pre_allocated_fd(s), pre_fd);
// Add port
int pt;
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
addr->sin6_family = AF_INET6;
addr->sin6_port = htons(port);
ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &pt), absl::OkStatus());
ASSERT_GE(grpc_tcp_server_port_fd_count(s, 0), 1);
ASSERT_EQ(grpc_tcp_server_port_fd(s, 0, 0), pre_fd);
// Start server
std::vector<grpc_pollset*> test_pollset;
test_pollset.push_back(g_pollset);
grpc_tcp_server_start(s, &test_pollset);
// Test connection
test_addr dst;
dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
ASSERT_EQ(getsockname(pre_fd, (struct sockaddr*)dst.addr.addr,
(socklen_t*)&dst.addr.len),
0);
ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
test_addr_init_str(&dst);
on_connect_result result;
on_connect_result_init(&result);
ASSERT_EQ(tcp_connect(&dst, &result), absl::OkStatus());
ASSERT_EQ(result.server_fd, pre_fd);
ASSERT_EQ(result.server, s);
ASSERT_EQ(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
result.server_fd);
grpc_tcp_server_unref(s);
close(pre_fd);
}
#ifdef GRPC_HAVE_UNIX_SOCKET
static int pre_allocate_unix_sock(grpc_tcp_server* s, const char* path,
int* fd) {
struct sockaddr_un address;
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
strcpy(address.sun_path, path);
int pre_fd = socket(address.sun_family, SOCK_STREAM, 0);
if (pre_fd < 0) {
gpr_log(GPR_ERROR, "Unable to create unix socket: %m");
return -1;
}
int b = bind(pre_fd, reinterpret_cast<struct sockaddr*>(&address),
sizeof(address));
if (b < 0) {
gpr_log(GPR_ERROR, "Unable to bind unix socket: %m");
return -1;
}
int l = listen(pre_fd, SOMAXCONN);
if (l < 0) {
gpr_log(GPR_ERROR, "Unable to listen on unix socket: %m");
return -1;
}
grpc_tcp_server_set_pre_allocated_fd(s, pre_fd);
*fd = pre_fd;
return 0;
}
static void test_pre_allocated_unix_fd() {
grpc_core::ExecCtx exec_ctx;
grpc_resolved_address resolved_addr;
struct sockaddr_un* addr =
reinterpret_cast<struct sockaddr_un*>(resolved_addr.addr);
grpc_tcp_server* s;
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
ASSERT_EQ(
absl::OkStatus(),
grpc_tcp_server_create(
nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
on_connect, nullptr, &s));
LOG_TEST("test_pre_allocated_unix_fd");
// Pre allocate FD
int pre_fd;
char path[100];
srand(time(nullptr));
memset(path, 0, sizeof(path));
sprintf(path, "/tmp/pre_fd_test_%d", rand());
int res_pre = pre_allocate_unix_sock(s, path, &pre_fd);
if (res_pre < 0) {
grpc_tcp_server_unref(s);
close(pre_fd);
unlink(path);
return;
}
ASSERT_EQ(grpc_tcp_server_pre_allocated_fd(s), pre_fd);
// Add port
int pt;
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_un));
addr->sun_family = AF_UNIX;
strcpy(addr->sun_path, path);
ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &pt), absl::OkStatus());
ASSERT_GE(grpc_tcp_server_port_fd_count(s, 0), 1);
ASSERT_EQ(grpc_tcp_server_port_fd(s, 0, 0), pre_fd);
// Start server
std::vector<grpc_pollset*> test_pollset;
test_pollset.push_back(g_pollset);
grpc_tcp_server_start(s, &test_pollset);
// Test connection
test_addr dst;
dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
ASSERT_EQ(getsockname(pre_fd, (struct sockaddr*)dst.addr.addr,
(socklen_t*)&dst.addr.len),
0);
ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
test_addr_init_str(&dst);
on_connect_result result;
on_connect_result_init(&result);
grpc_error_handle res_conn = tcp_connect(&dst, &result);
// If the path no longer exists, errno is 2. This can happen when
// runninig the test multiple times in parallel. Do not fail the test
if (absl::IsUnknown(res_conn) && res_conn.raw_code() == 2) {
gpr_log(GPR_ERROR,
"Unable to test pre_allocated unix socket: path does not exist");
grpc_tcp_server_unref(s);
close(pre_fd);
return;
}
ASSERT_EQ(res_conn, absl::OkStatus());
ASSERT_EQ(result.server_fd, pre_fd);
ASSERT_EQ(result.server, s);
ASSERT_EQ(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
result.server_fd);
grpc_tcp_server_unref(s);
close(pre_fd);
unlink(path);
}
#endif // GRPC_HAVE_UNIX_SOCKET
static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
}
@ -497,6 +716,10 @@ TEST(TcpServerPosixTest, MainTest) {
test_no_op_with_start();
test_no_op_with_port();
test_no_op_with_port_and_start();
test_pre_allocated_inet_fd();
#ifdef GRPC_HAVE_UNIX_SOCKET
test_pre_allocated_unix_fd();
#endif
if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
FAIL() << "getifaddrs: " << grpc_core::StrError(errno);

@ -547,6 +547,9 @@ def make_library(library):
if hdr in skip_headers[library]:
continue
if hdr == 'systemd/sd-daemon.h':
continue
if hdr == 'src/core/lib/profiling/stap_probes.h':
continue

@ -2257,6 +2257,8 @@ src/core/lib/iomgr/socket_utils_posix.h \
src/core/lib/iomgr/socket_utils_windows.cc \
src/core/lib/iomgr/socket_windows.cc \
src/core/lib/iomgr/socket_windows.h \
src/core/lib/iomgr/systemd_utils.cc \
src/core/lib/iomgr/systemd_utils.h \
src/core/lib/iomgr/tcp_client.cc \
src/core/lib/iomgr/tcp_client.h \
src/core/lib/iomgr/tcp_client_cfstream.cc \

@ -2039,6 +2039,8 @@ src/core/lib/iomgr/socket_utils_posix.h \
src/core/lib/iomgr/socket_utils_windows.cc \
src/core/lib/iomgr/socket_windows.cc \
src/core/lib/iomgr/socket_windows.h \
src/core/lib/iomgr/systemd_utils.cc \
src/core/lib/iomgr/systemd_utils.h \
src/core/lib/iomgr/tcp_client.cc \
src/core/lib/iomgr/tcp_client.h \
src/core/lib/iomgr/tcp_client_cfstream.cc \

Loading…
Cancel
Save