Support accepting external connections.

pull/18909/head
yang-g 6 years ago
parent c9a259aa3a
commit ac3a91edf8
  1. 2
      BUILD
  2. 2
      BUILD.gn
  3. 45
      CMakeLists.txt
  4. 51
      Makefile
  5. 15
      build.yaml
  6. 3
      gRPC-C++.podspec
  7. 2
      grpc.gyp
  8. 5
      include/grpcpp/impl/codegen/byte_buffer.h
  9. 28
      include/grpcpp/server.h
  10. 29
      include/grpcpp/server_builder_impl.h
  11. 7
      include/grpcpp/server_impl.h
  12. 57
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  13. 6
      src/core/lib/channel/handshaker.cc
  14. 5
      src/core/lib/iomgr/tcp_server.cc
  15. 22
      src/core/lib/iomgr/tcp_server.h
  16. 20
      src/core/lib/iomgr/tcp_server_custom.cc
  17. 90
      src/core/lib/iomgr/tcp_server_posix.cc
  18. 3
      src/core/lib/iomgr/tcp_server_utils_posix.h
  19. 20
      src/core/lib/iomgr/tcp_server_windows.cc
  20. 88
      src/cpp/server/external_connection_acceptor_impl.cc
  21. 71
      src/cpp/server/external_connection_acceptor_impl.h
  22. 17
      src/cpp/server/server_builder.cc
  23. 23
      src/cpp/server/server_cc.cc
  24. 18
      test/cpp/end2end/BUILD
  25. 362
      test/cpp/end2end/port_sharing_end2end_test.cc
  26. 2
      tools/doxygen/Doxyfile.c++.internal
  27. 22
      tools/run_tests/generated/sources_and_headers.json
  28. 24
      tools/run_tests/generated/tests.json

@ -141,6 +141,7 @@ GRPCXX_SRCS = [
"src/cpp/server/channel_argument_option.cc",
"src/cpp/server/create_default_thread_pool.cc",
"src/cpp/server/dynamic_thread_pool.cc",
"src/cpp/server/external_connection_acceptor_impl.cc",
"src/cpp/server/health/default_health_check_service.cc",
"src/cpp/server/health/health_check_service.cc",
"src/cpp/server/health/health_check_service_server_builder_option.cc",
@ -160,6 +161,7 @@ GRPCXX_HDRS = [
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/channel_filter.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/external_connection_acceptor_impl.h",
"src/cpp/server/health/default_health_check_service.h",
"src/cpp/server/thread_pool_interface.h",
"src/cpp/thread_manager/thread_manager.h",

@ -1324,6 +1324,8 @@ config("grpc_config") {
"src/cpp/server/create_default_thread_pool.cc",
"src/cpp/server/dynamic_thread_pool.cc",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/external_connection_acceptor_impl.cc",
"src/cpp/server/external_connection_acceptor_impl.h",
"src/cpp/server/health/default_health_check_service.cc",
"src/cpp/server/health/default_health_check_service.h",
"src/cpp/server/health/health_check_service.cc",

@ -668,6 +668,7 @@ add_dependencies(buildtests_cxx nonblocking_test)
add_dependencies(buildtests_cxx noop-benchmark)
add_dependencies(buildtests_cxx optional_test)
add_dependencies(buildtests_cxx orphanable_test)
add_dependencies(buildtests_cxx port_sharing_end2end_test)
add_dependencies(buildtests_cxx proto_server_reflection_test)
add_dependencies(buildtests_cxx proto_utils_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@ -2927,6 +2928,7 @@ add_library(grpc++
src/cpp/server/channel_argument_option.cc
src/cpp/server/create_default_thread_pool.cc
src/cpp/server/dynamic_thread_pool.cc
src/cpp/server/external_connection_acceptor_impl.cc
src/cpp/server/health/default_health_check_service.cc
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
@ -3318,6 +3320,7 @@ add_library(grpc++_cronet
src/cpp/server/channel_argument_option.cc
src/cpp/server/create_default_thread_pool.cc
src/cpp/server/dynamic_thread_pool.cc
src/cpp/server/external_connection_acceptor_impl.cc
src/cpp/server/health/default_health_check_service.cc
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
@ -4522,6 +4525,7 @@ add_library(grpc++_unsecure
src/cpp/server/channel_argument_option.cc
src/cpp/server/create_default_thread_pool.cc
src/cpp/server/dynamic_thread_pool.cc
src/cpp/server/external_connection_acceptor_impl.cc
src/cpp/server/health/default_health_check_service.cc
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
@ -14822,6 +14826,47 @@ target_link_libraries(orphanable_test
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(port_sharing_end2end_test
test/cpp/end2end/port_sharing_end2end_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(port_sharing_end2end_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(port_sharing_end2end_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
test_tcp_server
grpc++_test_util
grpc_test_util
grpc++
grpc
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)

@ -1240,6 +1240,7 @@ nonblocking_test: $(BINDIR)/$(CONFIG)/nonblocking_test
noop-benchmark: $(BINDIR)/$(CONFIG)/noop-benchmark
optional_test: $(BINDIR)/$(CONFIG)/optional_test
orphanable_test: $(BINDIR)/$(CONFIG)/orphanable_test
port_sharing_end2end_test: $(BINDIR)/$(CONFIG)/port_sharing_end2end_test
proto_server_reflection_test: $(BINDIR)/$(CONFIG)/proto_server_reflection_test
proto_utils_test: $(BINDIR)/$(CONFIG)/proto_utils_test
qps_interarrival_test: $(BINDIR)/$(CONFIG)/qps_interarrival_test
@ -1709,6 +1710,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/noop-benchmark \
$(BINDIR)/$(CONFIG)/optional_test \
$(BINDIR)/$(CONFIG)/orphanable_test \
$(BINDIR)/$(CONFIG)/port_sharing_end2end_test \
$(BINDIR)/$(CONFIG)/proto_server_reflection_test \
$(BINDIR)/$(CONFIG)/proto_utils_test \
$(BINDIR)/$(CONFIG)/qps_interarrival_test \
@ -1853,6 +1855,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/noop-benchmark \
$(BINDIR)/$(CONFIG)/optional_test \
$(BINDIR)/$(CONFIG)/orphanable_test \
$(BINDIR)/$(CONFIG)/port_sharing_end2end_test \
$(BINDIR)/$(CONFIG)/proto_server_reflection_test \
$(BINDIR)/$(CONFIG)/proto_utils_test \
$(BINDIR)/$(CONFIG)/qps_interarrival_test \
@ -2358,6 +2361,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/optional_test || ( echo test optional_test failed ; exit 1 )
$(E) "[RUN] Testing orphanable_test"
$(Q) $(BINDIR)/$(CONFIG)/orphanable_test || ( echo test orphanable_test failed ; exit 1 )
$(E) "[RUN] Testing port_sharing_end2end_test"
$(Q) $(BINDIR)/$(CONFIG)/port_sharing_end2end_test || ( echo test port_sharing_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing proto_server_reflection_test"
$(Q) $(BINDIR)/$(CONFIG)/proto_server_reflection_test || ( echo test proto_server_reflection_test failed ; exit 1 )
$(E) "[RUN] Testing proto_utils_test"
@ -5302,6 +5307,7 @@ LIBGRPC++_SRC = \
src/cpp/server/channel_argument_option.cc \
src/cpp/server/create_default_thread_pool.cc \
src/cpp/server/dynamic_thread_pool.cc \
src/cpp/server/external_connection_acceptor_impl.cc \
src/cpp/server/health/default_health_check_service.cc \
src/cpp/server/health/health_check_service.cc \
src/cpp/server/health/health_check_service_server_builder_option.cc \
@ -5702,6 +5708,7 @@ LIBGRPC++_CRONET_SRC = \
src/cpp/server/channel_argument_option.cc \
src/cpp/server/create_default_thread_pool.cc \
src/cpp/server/dynamic_thread_pool.cc \
src/cpp/server/external_connection_acceptor_impl.cc \
src/cpp/server/health/default_health_check_service.cc \
src/cpp/server/health/health_check_service.cc \
src/cpp/server/health/health_check_service_server_builder_option.cc \
@ -6853,6 +6860,7 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/server/channel_argument_option.cc \
src/cpp/server/create_default_thread_pool.cc \
src/cpp/server/dynamic_thread_pool.cc \
src/cpp/server/external_connection_acceptor_impl.cc \
src/cpp/server/health/default_health_check_service.cc \
src/cpp/server/health/health_check_service.cc \
src/cpp/server/health/health_check_service_server_builder_option.cc \
@ -17756,6 +17764,49 @@ endif
endif
PORT_SHARING_END2END_TEST_SRC = \
test/cpp/end2end/port_sharing_end2end_test.cc \
PORT_SHARING_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(PORT_SHARING_END2END_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/port_sharing_end2end_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/port_sharing_end2end_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/port_sharing_end2end_test: $(PROTOBUF_DEP) $(PORT_SHARING_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libtest_tcp_server.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(PORT_SHARING_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libtest_tcp_server.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/port_sharing_end2end_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/port_sharing_end2end_test.o: $(LIBDIR)/$(CONFIG)/libtest_tcp_server.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_port_sharing_end2end_test: $(PORT_SHARING_END2END_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(PORT_SHARING_END2END_TEST_OBJS:.o=.dep)
endif
endif
PROTO_SERVER_REFLECTION_TEST_SRC = \
test/cpp/end2end/proto_server_reflection_test.cc \

@ -1408,6 +1408,7 @@ filegroups:
- src/cpp/client/create_channel_internal.h
- src/cpp/common/channel_filter.h
- src/cpp/server/dynamic_thread_pool.h
- src/cpp/server/external_connection_acceptor_impl.h
- src/cpp/server/health/default_health_check_service.h
- src/cpp/server/thread_pool_interface.h
- src/cpp/thread_manager/thread_manager.h
@ -1432,6 +1433,7 @@ filegroups:
- src/cpp/server/channel_argument_option.cc
- src/cpp/server/create_default_thread_pool.cc
- src/cpp/server/dynamic_thread_pool.cc
- src/cpp/server/external_connection_acceptor_impl.cc
- src/cpp/server/health/default_health_check_service.cc
- src/cpp/server/health/health_check_service.cc
- src/cpp/server/health/health_check_service_server_builder_option.cc
@ -5156,6 +5158,19 @@ targets:
- gpr
uses:
- grpc++_test
- name: port_sharing_end2end_test
gtest: true
build: test
language: c++
src:
- test/cpp/end2end/port_sharing_end2end_test.cc
deps:
- test_tcp_server
- grpc++_test_util
- grpc_test_util
- grpc++
- grpc
- gpr
- name: proto_server_reflection_test
gtest: true
build: test

@ -202,6 +202,7 @@ Pod::Spec.new do |s|
'src/cpp/client/create_channel_internal.h',
'src/cpp/common/channel_filter.h',
'src/cpp/server/dynamic_thread_pool.h',
'src/cpp/server/external_connection_acceptor_impl.h',
'src/cpp/server/health/default_health_check_service.h',
'src/cpp/server/thread_pool_interface.h',
'src/cpp/thread_manager/thread_manager.h',
@ -233,6 +234,7 @@ Pod::Spec.new do |s|
'src/cpp/server/channel_argument_option.cc',
'src/cpp/server/create_default_thread_pool.cc',
'src/cpp/server/dynamic_thread_pool.cc',
'src/cpp/server/external_connection_acceptor_impl.cc',
'src/cpp/server/health/default_health_check_service.cc',
'src/cpp/server/health/health_check_service.cc',
'src/cpp/server/health/health_check_service_server_builder_option.cc',
@ -566,6 +568,7 @@ Pod::Spec.new do |s|
'src/cpp/client/create_channel_internal.h',
'src/cpp/common/channel_filter.h',
'src/cpp/server/dynamic_thread_pool.h',
'src/cpp/server/external_connection_acceptor_impl.h',
'src/cpp/server/health/default_health_check_service.h',
'src/cpp/server/thread_pool_interface.h',
'src/cpp/thread_manager/thread_manager.h',

@ -1434,6 +1434,7 @@
'src/cpp/server/channel_argument_option.cc',
'src/cpp/server/create_default_thread_pool.cc',
'src/cpp/server/dynamic_thread_pool.cc',
'src/cpp/server/external_connection_acceptor_impl.cc',
'src/cpp/server/health/default_health_check_service.cc',
'src/cpp/server/health/health_check_service.cc',
'src/cpp/server/health/health_check_service_server_builder_option.cc',
@ -1589,6 +1590,7 @@
'src/cpp/server/channel_argument_option.cc',
'src/cpp/server/create_default_thread_pool.cc',
'src/cpp/server/dynamic_thread_pool.cc',
'src/cpp/server/external_connection_acceptor_impl.cc',
'src/cpp/server/health/default_health_check_service.cc',
'src/cpp/server/health/health_check_service.cc',
'src/cpp/server/health/health_check_service_server_builder_option.cc',

@ -29,6 +29,10 @@
#include <vector>
namespace grpc_impl {
class ExternalConnectionAcceptorImpl;
} // namespace grpc_impl
namespace grpc {
class ServerInterface;
@ -185,6 +189,7 @@ class ByteBuffer final {
friend class ProtoBufferReader;
friend class ProtoBufferWriter;
friend class internal::GrpcByteBufferPeer;
friend class ::grpc_impl::ExternalConnectionAcceptorImpl;
grpc_byte_buffer* buffer_;

@ -42,9 +42,9 @@
struct grpc_server;
namespace grpc_impl {
class ExternalConnectionAcceptorImpl;
class ServerInitializer;
}
} // namespace grpc_impl
namespace grpc {
class AsyncGenericService;
@ -174,15 +174,18 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
///
/// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
/// server completion queues passed via sync_server_cqs param.
Server(int max_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec,
grpc_resource_quota* server_rq = nullptr,
std::vector<
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
interceptor_creators = std::vector<std::unique_ptr<
experimental::ServerInterceptorFactoryInterface>>());
Server(
int max_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<::grpc_impl::ExternalConnectionAcceptorImpl>>
acceptors,
grpc_resource_quota* server_rq = nullptr,
std::vector<
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
interceptor_creators = std::vector<std::unique_ptr<
experimental::ServerInterceptorFactoryInterface>>());
/// Start the server.
///
@ -262,6 +265,9 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
grpc_impl::ServerInitializer* initializer();
std::vector<std::shared_ptr<::grpc_impl::ExternalConnectionAcceptorImpl>>
acceptors_;
// A vector of interceptor factory objects.
// This should be destroyed after health_check_service_ and this requirement
// is satisfied by declaring interceptor_creators_ before

@ -36,10 +36,11 @@
struct grpc_resource_quota;
namespace grpc_impl {
class ExternalConnectionAcceptorImpl;
class ResourceQuota;
class ServerCredentials;
} // namespace grpc_impl
namespace grpc {
class AsyncGenericService;
@ -47,7 +48,6 @@ class CompletionQueue;
class Server;
class ServerCompletionQueue;
class Service;
namespace testing {
class ServerBuilderPluginTest;
} // namespace testing
@ -55,7 +55,21 @@ class ServerBuilderPluginTest;
namespace experimental {
class CallbackGenericService;
}
class ExternalConnectionAcceptor {
public:
struct NewConnectionParameters {
int fd = -1;
ByteBuffer read_buffer; // data intended for the grpc server
};
virtual ~ExternalConnectionAcceptor() {}
// If called before grpc::Server is started, the new connection will be
// closed.
virtual void HandleNewConnection(NewConnectionParameters* p) = 0;
};
} // namespace grpc
namespace grpc_impl {
/// A builder class for the creation and startup of \a grpc::Server instances.
@ -257,6 +271,16 @@ class ServerBuilder {
/// at any time.
experimental_type experimental() { return experimental_type(this); }
enum ExternalConnectionType {
CONNECTION_FROM_FD = 0 // in the form of a file descriptor
};
// EXPERIMENTAL API:
// Create an acceptor to take in external connections and pass them to the
// gRPC server.
std::unique_ptr<grpc::ExternalConnectionAcceptor>
AddExternalConnectionAcceptor(ExternalConnectionType type,
std::shared_ptr<ServerCredentials> creds);
protected:
/// Experimental, to be deprecated
struct Port {
@ -347,6 +371,7 @@ class ServerBuilder {
std::vector<
std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
interceptor_creators_;
std::vector<std::shared_ptr<ExternalConnectionAcceptorImpl>> acceptors_;
};
} // namespace grpc_impl

@ -48,7 +48,7 @@ class ServerContext;
} // namespace grpc
namespace grpc_impl {
class ExternalConnectionAcceptorImpl;
class HealthCheckServiceInterface;
class ServerInitializer;
@ -183,6 +183,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<::grpc_impl::ExternalConnectionAcceptorImpl>>
acceptors,
grpc_resource_quota* server_rq = nullptr,
std::vector<std::unique_ptr<
grpc::experimental::ServerInterceptorFactoryInterface>>
@ -268,6 +270,9 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
grpc_impl::ServerInitializer* initializer();
std::vector<std::shared_ptr<::grpc_impl::ExternalConnectionAcceptorImpl>>
acceptors_;
// A vector of interceptor factory objects.
// This should be destroyed after health_check_service_ and this requirement
// is satisfied by declaring interceptor_creators_ before

@ -20,12 +20,12 @@
#include "src/core/ext/transport/chttp2/server/chttp2_server.h"
#include <grpc/grpc.h>
#include <inttypes.h>
#include <limits.h>
#include <string.h>
#include <grpc/grpc.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
@ -289,6 +289,55 @@ static void server_destroy_listener(grpc_server* server, void* arg,
grpc_tcp_server_unref(tcp_server);
}
static grpc_error* chttp2_server_add_acceptor(grpc_server* server,
const char* name,
grpc_channel_args* args) {
grpc_tcp_server* tcp_server = nullptr;
grpc_error* err = GRPC_ERROR_NONE;
server_state* state = nullptr;
const grpc_arg* arg = nullptr;
grpc_core::TcpServerFdHandler** arg_val = nullptr;
state = static_cast<server_state*>(gpr_zalloc(sizeof(*state)));
GRPC_CLOSURE_INIT(&state->tcp_server_shutdown_complete,
tcp_server_shutdown_complete, state,
grpc_schedule_on_exec_ctx);
err = grpc_tcp_server_create(&state->tcp_server_shutdown_complete, args,
&tcp_server);
if (err != GRPC_ERROR_NONE) {
goto error;
}
state->server = server;
state->tcp_server = tcp_server;
state->args = args;
state->shutdown = true;
gpr_mu_init(&state->mu);
// TODO(yangg) channelz
arg = grpc_channel_args_find(args, name);
GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
arg_val = static_cast<grpc_core::TcpServerFdHandler**>(arg->value.pointer.p);
*arg_val = grpc_tcp_server_create_fd_handler(tcp_server);
grpc_server_add_listener(server, state, server_start_listener,
server_destroy_listener, /* socket_uuid */ 0);
return err;
/* Error path: cleanup and return */
error:
GPR_ASSERT(err != GRPC_ERROR_NONE);
if (tcp_server) {
grpc_tcp_server_unref(tcp_server);
} else {
grpc_channel_args_destroy(args);
gpr_free(state);
}
return err;
}
grpc_error* grpc_chttp2_server_add_port(grpc_server* server, const char* addr,
grpc_channel_args* args,
int* port_num) {
@ -306,6 +355,10 @@ grpc_error* grpc_chttp2_server_add_port(grpc_server* server, const char* addr,
*port_num = -1;
if (strncmp(addr, "external:", 9) == 0) {
return chttp2_server_add_acceptor(server, addr, args);
}
/* resolve address */
err = grpc_blocking_resolve_address(addr, "https", &resolved);
if (err != GRPC_ERROR_NONE) {

@ -20,6 +20,7 @@
#include <string.h>
#include <grpc/impl/codegen/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
@ -226,6 +227,11 @@ void HandshakeManager::DoHandshake(grpc_endpoint* endpoint,
args_.read_buffer =
static_cast<grpc_slice_buffer*>(gpr_malloc(sizeof(*args_.read_buffer)));
grpc_slice_buffer_init(args_.read_buffer);
if (acceptor != nullptr && acceptor->external_connection &&
acceptor->pending_data != nullptr) {
grpc_slice_buffer_swap(args_.read_buffer,
&(acceptor->pending_data->data.raw.slice_buffer));
}
// Initialize state needed for calling handshakers.
acceptor_ = acceptor;
GRPC_CLOSURE_INIT(&call_next_handshaker_,

@ -41,6 +41,11 @@ grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
return grpc_tcp_server_impl->add_port(s, addr, out_port);
}
grpc_core::TcpServerFdHandler* grpc_tcp_server_create_fd_handler(
grpc_tcp_server* s) {
return grpc_tcp_server_impl->create_fd_handler(s);
}
unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server* s,
unsigned port_index) {
return grpc_tcp_server_impl->port_fd_count(s, port_index);

@ -22,7 +22,9 @@
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
@ -37,6 +39,9 @@ typedef struct grpc_tcp_server_acceptor {
/* Indices that may be passed to grpc_tcp_server_port_fd(). */
unsigned port_index;
unsigned fd_index;
/* Data when the connection is passed to tcp_server from external. */
bool external_connection;
grpc_byte_buffer* pending_data;
} grpc_tcp_server_acceptor;
/* Called for newly connected TCP connections.
@ -44,6 +49,17 @@ typedef struct grpc_tcp_server_acceptor {
typedef void (*grpc_tcp_server_cb)(void* arg, grpc_endpoint* ep,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor);
namespace grpc_core {
// An interface for a handler to take a externally connected fd as a internal
// connection.
class TcpServerFdHandler {
public:
virtual ~TcpServerFdHandler() = default;
virtual void Handle(int fd, grpc_byte_buffer* pending_read) GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS;
};
} // namespace grpc_core
typedef struct grpc_tcp_server_vtable {
grpc_error* (*create)(grpc_closure* shutdown_complete,
@ -54,6 +70,7 @@ typedef struct grpc_tcp_server_vtable {
void* cb_arg);
grpc_error* (*add_port)(grpc_tcp_server* s, const grpc_resolved_address* addr,
int* out_port);
grpc_core::TcpServerFdHandler* (*create_fd_handler)(grpc_tcp_server* s);
unsigned (*port_fd_count)(grpc_tcp_server* s, unsigned port_index);
int (*port_fd)(grpc_tcp_server* s, unsigned port_index, unsigned fd_index);
grpc_tcp_server* (*ref)(grpc_tcp_server* s);
@ -88,6 +105,11 @@ grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
const grpc_resolved_address* addr,
int* out_port);
/* Create and return a TcpServerFdHandler so that it can be used by upper layer
to hand over an externally connected fd to the grpc server. */
grpc_core::TcpServerFdHandler* grpc_tcp_server_create_fd_handler(
grpc_tcp_server* s);
/* Number of fds at the given port_index, or 0 if port_index is out of
bounds. */
unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index);

@ -233,6 +233,7 @@ static void finish_accept(grpc_tcp_listener* sp, grpc_custom_socket* socket) {
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = 0;
acceptor->external_connection = false;
sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, nullptr, acceptor);
gpr_free(peer_name_string);
}
@ -456,16 +457,17 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
}
}
static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
grpc_tcp_server* s) {
return nullptr;
}
grpc_tcp_server_vtable custom_tcp_server_vtable = {
tcp_server_create,
tcp_server_start,
tcp_server_add_port,
tcp_server_port_fd_count,
tcp_server_port_fd,
tcp_server_ref,
tcp_server_shutdown_starting_add,
tcp_server_unref,
tcp_server_shutdown_listeners};
tcp_server_create, tcp_server_start,
tcp_server_add_port, tcp_server_create_fd_handler,
tcp_server_port_fd_count, tcp_server_port_fd,
tcp_server_ref, tcp_server_shutdown_starting_add,
tcp_server_unref, tcp_server_shutdown_listeners};
#ifdef GRPC_UV_TEST
grpc_tcp_server_vtable* default_tcp_server_vtable = &custom_tcp_server_vtable;

@ -27,8 +27,6 @@
#ifdef GRPC_POSIX_SOCKET_TCP_SERVER
#include "src/core/lib/iomgr/tcp_server.h"
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
@ -47,11 +45,14 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
@ -96,6 +97,7 @@ static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
s->tail = nullptr;
s->nports = 0;
s->channel_args = grpc_channel_args_copy(args);
s->fd_handler = nullptr;
gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
*server = s;
return GRPC_ERROR_NONE;
@ -117,6 +119,7 @@ static void finish_shutdown(grpc_tcp_server* s) {
gpr_free(sp);
}
grpc_channel_args_destroy(s->channel_args);
grpc_core::Delete(s->fd_handler);
gpr_free(s);
}
@ -254,6 +257,7 @@ static void on_read(void* arg, grpc_error* err) {
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = sp->fd_index;
acceptor->external_connection = false;
sp->server->on_accept_cb(
sp->server->on_accept_cb_arg,
@ -562,14 +566,78 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
gpr_mu_unlock(&s->mu);
}
namespace {
class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
public:
explicit ExternalConnectionHandler(grpc_tcp_server* s) : s_(s) {}
// TODO(yangg) resolve duplicate code with on_read
void Handle(int fd, grpc_byte_buffer* buf) override {
grpc_pollset* read_notifier_pollset;
grpc_resolved_address addr;
char* addr_str;
char* name;
memset(&addr, 0, sizeof(addr));
addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
grpc_core::ExecCtx exec_ctx;
if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
&(addr.len)) < 0) {
gpr_log(GPR_ERROR, "Failed getpeername: %s", strerror(errno));
close(fd);
return;
}
grpc_set_socket_no_sigpipe_if_possible(fd);
addr_str = grpc_sockaddr_to_uri(&addr);
gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "SERVER_CONNECT: incoming external connection: %s",
addr_str);
}
grpc_fd* fdobj = grpc_fd_create(fd, name, true);
read_notifier_pollset =
s_->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&s_->next_pollset_to_assign, 1)) %
s_->pollset_count];
grpc_pollset_add_fd(read_notifier_pollset, fdobj);
grpc_tcp_server_acceptor* acceptor =
static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
acceptor->from_server = s_;
acceptor->port_index = -1;
acceptor->fd_index = -1;
acceptor->external_connection = true;
acceptor->pending_data = buf;
s_->on_accept_cb(s_->on_accept_cb_arg,
grpc_tcp_create(fdobj, s_->channel_args, addr_str),
read_notifier_pollset, acceptor);
gpr_free(name);
gpr_free(addr_str);
}
private:
grpc_tcp_server* s_;
};
} // namespace
static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
grpc_tcp_server* s) {
s->fd_handler = grpc_core::New<ExternalConnectionHandler>(s);
return s->fd_handler;
}
grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
tcp_server_create,
tcp_server_start,
tcp_server_add_port,
tcp_server_port_fd_count,
tcp_server_port_fd,
tcp_server_ref,
tcp_server_shutdown_starting_add,
tcp_server_unref,
tcp_server_shutdown_listeners};
tcp_server_create, tcp_server_start,
tcp_server_add_port, tcp_server_create_fd_handler,
tcp_server_port_fd_count, tcp_server_port_fd,
tcp_server_ref, tcp_server_shutdown_starting_add,
tcp_server_unref, tcp_server_shutdown_listeners};
#endif /* GRPC_POSIX_SOCKET_TCP_SERVER */

@ -92,6 +92,9 @@ struct grpc_tcp_server {
/* channel args for this server */
grpc_channel_args* channel_args;
/* a handler for external connections, owned */
grpc_core::TcpServerFdHandler* fd_handler;
};
/* If successful, add a listener to \a s for \a addr, set \a dsmode for the

@ -372,6 +372,7 @@ static void on_accept(void* arg, grpc_error* error) {
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = 0;
acceptor->external_connection = false;
sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
}
/* As we were notified from the IOCP of one and exactly one accept,
@ -545,16 +546,17 @@ static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
return -1;
}
static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
grpc_tcp_server* s) {
return nullptr;
}
static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
tcp_server_create,
tcp_server_start,
tcp_server_add_port,
tcp_server_port_fd_count,
tcp_server_port_fd,
tcp_server_ref,
tcp_server_shutdown_starting_add,
tcp_server_unref,
tcp_server_shutdown_listeners};
tcp_server_create, tcp_server_start,
tcp_server_add_port, tcp_server_create_fd_handler,
tcp_server_port_fd_count, tcp_server_port_fd,
tcp_server_ref, tcp_server_shutdown_starting_add,
tcp_server_unref, tcp_server_shutdown_listeners};
#endif /* GRPC_WINSOCK_SOCKET */

@ -0,0 +1,88 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/cpp/server/external_connection_acceptor_impl.h"
#include <memory>
#include <grpcpp/server_builder_impl.h>
#include <grpcpp/support/channel_arguments.h>
namespace grpc_impl {
namespace {
class InternalAcceptor : public grpc::ExternalConnectionAcceptor {
public:
explicit InternalAcceptor(
std::shared_ptr<ExternalConnectionAcceptorImpl> impl)
: impl_(std::move(impl)) {}
void HandleNewConnection(NewConnectionParameters* p) override {
impl_->HandleNewConnection(p);
}
private:
std::shared_ptr<ExternalConnectionAcceptorImpl> impl_;
};
} // namespace
ExternalConnectionAcceptorImpl::ExternalConnectionAcceptorImpl(
const grpc::string& name, ServerBuilder::ExternalConnectionType type,
std::shared_ptr<ServerCredentials> creds)
: name_(name), creds_(std::move(creds)) {
GPR_ASSERT(type == ServerBuilder::ExternalConnectionType::CONNECTION_FROM_FD);
}
std::unique_ptr<grpc::ExternalConnectionAcceptor>
ExternalConnectionAcceptorImpl::GetAcceptor() {
std::lock_guard<std::mutex> lock(mu_);
GPR_ASSERT(!has_acceptor_);
has_acceptor_ = true;
return std::unique_ptr<grpc::ExternalConnectionAcceptor>(
new InternalAcceptor(shared_from_this()));
}
void ExternalConnectionAcceptorImpl::HandleNewConnection(
grpc::ExternalConnectionAcceptor::NewConnectionParameters* p) {
std::lock_guard<std::mutex> lock(mu_);
if (shutdown_ || !started_) {
// TODO(yangg) clean up.
return;
}
if (handler_) {
handler_->Handle(p->fd, p->read_buffer.c_buffer());
}
}
void ExternalConnectionAcceptorImpl::Shutdown() {
std::lock_guard<std::mutex> lock(mu_);
shutdown_ = true;
}
void ExternalConnectionAcceptorImpl::Start() {
std::lock_guard<std::mutex> lock(mu_);
GPR_ASSERT(!started_);
GPR_ASSERT(has_acceptor_);
GPR_ASSERT(!shutdown_);
started_ = true;
}
void ExternalConnectionAcceptorImpl::SetToChannelArgs(
::grpc::ChannelArguments* args) {
args->SetPointer(name_.c_str(), &handler_);
}
} // namespace grpc_impl

@ -0,0 +1,71 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_
#define SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_
#include <memory>
#include <mutex>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/security/server_credentials_impl.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/support/channel_arguments.h>
#include "src/core/lib/iomgr/tcp_server.h"
namespace grpc_impl {
typedef void (*RawConnectionHandler)(int fd, grpc_byte_buffer* buffer);
class ExternalConnectionAcceptorImpl
: public std::enable_shared_from_this<ExternalConnectionAcceptorImpl> {
public:
ExternalConnectionAcceptorImpl(const grpc::string& name,
ServerBuilder::ExternalConnectionType type,
std::shared_ptr<ServerCredentials> creds);
// Should only be called once.
std::unique_ptr<grpc::ExternalConnectionAcceptor> GetAcceptor();
void HandleNewConnection(
grpc::ExternalConnectionAcceptor::NewConnectionParameters* p);
void Shutdown();
void Start();
const char* name() { return name_.c_str(); }
ServerCredentials* GetCredentials() { return creds_.get(); }
void SetToChannelArgs(::grpc::ChannelArguments* args);
private:
const grpc::string name_;
std::shared_ptr<ServerCredentials> creds_;
grpc_core::TcpServerFdHandler* handler_ = nullptr; // not owned
std::mutex mu_;
bool has_acceptor_ = false;
bool started_ = false;
bool shutdown_ = false;
};
} // namespace grpc_impl
#endif // SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_

@ -26,7 +26,9 @@
#include <utility>
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/cpp/server/external_connection_acceptor_impl.h"
#include "src/cpp/server/thread_pool_interface.h"
namespace grpc_impl {
@ -307,8 +309,8 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<grpc::Server> server(new grpc::Server(
max_receive_message_size_, &args, sync_server_cqs,
sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec, resource_quota_,
std::move(interceptor_creators_)));
sync_server_settings_.cq_timeout_msec, std::move(acceptors_),
resource_quota_, std::move(interceptor_creators_)));
grpc_impl::ServerInitializer* initializer = server->initializer();
@ -409,4 +411,15 @@ ServerBuilder& ServerBuilder::EnableWorkaround(grpc_workaround_list id) {
}
}
std::unique_ptr<grpc::ExternalConnectionAcceptor>
ServerBuilder::AddExternalConnectionAcceptor(
ExternalConnectionType type, std::shared_ptr<ServerCredentials> creds) {
grpc::string name_prefix("external:");
char count_str[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(static_cast<long>(acceptors_.size()), count_str);
acceptors_.emplace_back(std::make_shared<ExternalConnectionAcceptorImpl>(
name_prefix.append(count_str), type, creds));
return acceptors_.back()->GetAcceptor();
}
} // namespace grpc_impl

@ -28,6 +28,7 @@
#include <grpcpp/completion_queue.h>
#include <grpcpp/generic/async_generic_service.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/byte_buffer.h>
#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h>
#include <grpcpp/impl/codegen/server_interceptor.h>
@ -46,6 +47,7 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/server/external_connection_acceptor_impl.h"
#include "src/cpp/server/health/default_health_check_service.h"
#include "src/cpp/thread_manager/thread_manager.h"
@ -759,11 +761,14 @@ Server::Server(
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<::grpc_impl::ExternalConnectionAcceptorImpl>>
acceptors,
grpc_resource_quota* server_rq,
std::vector<
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
interceptor_creators)
: interceptor_creators_(std::move(interceptor_creators)),
: acceptors_(std::move(acceptors)),
interceptor_creators_(std::move(interceptor_creators)),
max_receive_message_size_(max_receive_message_size),
sync_server_cqs_(std::move(sync_server_cqs)),
started_(false),
@ -797,6 +802,10 @@ Server::Server(
}
}
for (auto& acceptor : acceptors_) {
acceptor->SetToChannelArgs(args);
}
grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args);
@ -1008,6 +1017,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
RegisterService(nullptr, default_health_check_service_impl);
}
for (auto& acceptor : acceptors_) {
acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_);
}
// If this server uses callback methods, then create a callback generic
// service to handle any unimplemented methods using the default reactor
// creator
@ -1052,6 +1065,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
if (default_health_check_service_impl != nullptr) {
default_health_check_service_impl->StartServingThread();
}
for (auto& acceptor : acceptors_) {
acceptor->Start();
}
}
void Server::ShutdownInternal(gpr_timespec deadline) {
@ -1062,6 +1079,10 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
shutdown_ = true;
for (auto& acceptor : acceptors_) {
acceptor->Shutdown();
}
/// The completion queue to use for server shutdown completion notification
CompletionQueue shutdown_cq;
ShutdownTag shutdown_tag; // Dummy shutdown tag

@ -697,3 +697,21 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "port_sharing_end2end_test",
srcs = ["port_sharing_end2end_test.cc"],
external_deps = [
"gtest",
],
deps = [
":test_service_impl",
"//:gpr",
"//:grpc",
"//:grpc++",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
],
)

@ -0,0 +1,362 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include <gtest/gtest.h>
#include <mutex>
#include <thread>
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/core/util/test_tcp_server.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/test_credentials_provider.h"
#ifdef GRPC_POSIX_SOCKET_TCP_SERVER
#include "src/core/lib/iomgr/tcp_posix.h"
namespace grpc {
namespace testing {
namespace {
class TestScenario {
public:
TestScenario(bool server_port, bool pending_data,
const grpc::string& creds_type)
: server_has_port(server_port),
queue_pending_data(pending_data),
credentials_type(creds_type) {}
void Log() const;
// server has its own port or not
bool server_has_port;
// whether tcp server should read some data before handoff
bool queue_pending_data;
const grpc::string credentials_type;
};
static std::ostream& operator<<(std::ostream& out,
const TestScenario& scenario) {
return out << "TestScenario{server_has_port="
<< (scenario.server_has_port ? "true" : "false")
<< ", queue_pending_data="
<< (scenario.queue_pending_data ? "true" : "false")
<< ", credentials='" << scenario.credentials_type << "'}";
}
void TestScenario::Log() const {
std::ostringstream out;
out << *this;
gpr_log(GPR_ERROR, "%s", out.str().c_str());
}
// Set up a test tcp server which is in charge of accepting connections and
// handing off the connections as fds.
class TestTcpServer {
public:
TestTcpServer()
: shutdown_(false),
queue_data_(false),
port_(grpc_pick_unused_port_or_die()) {
std::ostringstream server_address;
server_address << "localhost:" << port_;
address_ = server_address.str();
test_tcp_server_init(&tcp_server_, &TestTcpServer::OnConnect, this);
GRPC_CLOSURE_INIT(&on_fd_released_, &TestTcpServer::OnFdReleased, this,
grpc_schedule_on_exec_ctx);
}
~TestTcpServer() {
running_thread_.join();
test_tcp_server_destroy(&tcp_server_);
grpc_recycle_unused_port(port_);
}
// Read some data before handing off the connection.
void SetQueueData() { queue_data_ = true; }
void Start() {
test_tcp_server_start(&tcp_server_, port_);
gpr_log(GPR_INFO, "Test TCP server started at %s", address_.c_str());
}
const grpc::string& address() { return address_; }
void SetAcceptor(std::unique_ptr<ExternalConnectionAcceptor> acceptor) {
connection_acceptor_ = std::move(acceptor);
}
void Run() {
running_thread_ = std::thread([this]() {
while (true) {
{
std::lock_guard<std::mutex> lock(mu_);
if (shutdown_) {
return;
}
}
test_tcp_server_poll(&tcp_server_, 1);
}
});
}
void Shutdown() {
std::lock_guard<std::mutex> lock(mu_);
shutdown_ = true;
}
static void OnConnect(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) {
auto* self = static_cast<TestTcpServer*>(arg);
self->OnConnect(tcp, accepting_pollset, acceptor);
}
static void OnFdReleased(void* arg, grpc_error* err) {
auto* self = static_cast<TestTcpServer*>(arg);
self->OnFdReleased(err);
}
private:
void OnConnect(grpc_endpoint* tcp, grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) {
char* peer = grpc_endpoint_get_peer(tcp);
gpr_log(GPR_INFO, "Got incoming connection! from %s", peer);
gpr_free(peer);
EXPECT_FALSE(acceptor->external_connection);
gpr_free(acceptor);
grpc_tcp_destroy_and_release_fd(tcp, &fd_, &on_fd_released_);
}
void OnFdReleased(grpc_error* err) {
EXPECT_EQ(GRPC_ERROR_NONE, err);
ExternalConnectionAcceptor::NewConnectionParameters p;
p.fd = fd_;
if (queue_data_) {
char buf[1024];
ssize_t read_bytes = 0;
while (read_bytes <= 0) {
read_bytes = read(fd_, buf, 1024);
}
Slice data(buf, read_bytes);
p.read_buffer = ByteBuffer(&data, 1);
}
gpr_log(GPR_INFO, "Handing off fd %d with data size %d", fd_,
static_cast<int>(p.read_buffer.Length()));
connection_acceptor_->HandleNewConnection(&p);
}
std::mutex mu_;
bool shutdown_;
int fd_;
bool queue_data_;
grpc_closure on_fd_released_;
std::thread running_thread_;
int port_;
grpc::string address_;
std::unique_ptr<ExternalConnectionAcceptor> connection_acceptor_;
test_tcp_server tcp_server_;
};
class PortSharingEnd2endTest : public ::testing::TestWithParam<TestScenario> {
protected:
PortSharingEnd2endTest() : is_server_started_(false), first_picked_port_(0) {
GetParam().Log();
}
void SetUp() override {
if (GetParam().queue_pending_data) {
tcp_server1_.SetQueueData();
tcp_server2_.SetQueueData();
}
tcp_server1_.Start();
tcp_server2_.Start();
ServerBuilder builder;
if (GetParam().server_has_port) {
int port = grpc_pick_unused_port_or_die();
first_picked_port_ = port;
server_address_ << "localhost:" << port;
auto creds = GetCredentialsProvider()->GetServerCredentials(
GetParam().credentials_type);
builder.AddListeningPort(server_address_.str(), creds);
gpr_log(GPR_INFO, "gRPC server listening on %s",
server_address_.str().c_str());
}
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
GetParam().credentials_type);
auto acceptor1 = builder.AddExternalConnectionAcceptor(
ServerBuilder::ExternalConnectionType::CONNECTION_FROM_FD,
server_creds);
tcp_server1_.SetAcceptor(std::move(acceptor1));
auto acceptor2 = builder.AddExternalConnectionAcceptor(
ServerBuilder::ExternalConnectionType::CONNECTION_FROM_FD,
server_creds);
tcp_server2_.SetAcceptor(std::move(acceptor2));
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
is_server_started_ = true;
tcp_server1_.Run();
tcp_server2_.Run();
}
void TearDown() override {
tcp_server1_.Shutdown();
tcp_server2_.Shutdown();
if (is_server_started_) {
server_->Shutdown();
}
if (first_picked_port_ > 0) {
grpc_recycle_unused_port(first_picked_port_);
}
}
void ResetStubs() {
EXPECT_TRUE(is_server_started_);
ChannelArguments args;
args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
GetParam().credentials_type, &args);
channel_handoff1_ =
CreateCustomChannel(tcp_server1_.address(), channel_creds, args);
stub_handoff1_ = EchoTestService::NewStub(channel_handoff1_);
channel_handoff2_ =
CreateCustomChannel(tcp_server2_.address(), channel_creds, args);
stub_handoff2_ = EchoTestService::NewStub(channel_handoff2_);
if (GetParam().server_has_port) {
ChannelArguments direct_args;
direct_args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
auto direct_creds = GetCredentialsProvider()->GetChannelCredentials(
GetParam().credentials_type, &direct_args);
channel_direct_ =
CreateCustomChannel(server_address_.str(), direct_creds, direct_args);
stub_direct_ = EchoTestService::NewStub(channel_direct_);
}
}
bool is_server_started_;
// channel/stub to the test tcp server, the connection will be handed to the
// grpc server.
std::shared_ptr<Channel> channel_handoff1_;
std::unique_ptr<EchoTestService::Stub> stub_handoff1_;
std::shared_ptr<Channel> channel_handoff2_;
std::unique_ptr<EchoTestService::Stub> stub_handoff2_;
// channel/stub to talk to the grpc server directly, if applicable.
std::shared_ptr<Channel> channel_direct_;
std::unique_ptr<EchoTestService::Stub> stub_direct_;
std::unique_ptr<Server> server_;
std::ostringstream server_address_;
TestServiceImpl service_;
TestTcpServer tcp_server1_;
TestTcpServer tcp_server2_;
int first_picked_port_;
};
static void SendRpc(EchoTestService::Stub* stub, int num_rpcs) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello hello hello hello");
for (int i = 0; i < num_rpcs; ++i) {
ClientContext context;
Status s = stub->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.ok());
}
}
std::vector<TestScenario> CreateTestScenarios() {
std::vector<TestScenario> scenarios;
std::vector<grpc::string> credentials_types;
credentials_types = GetCredentialsProvider()->GetSecureCredentialsTypeList();
// Only allow insecure credentials type when it is registered with the
// provider. User may create providers that do not have insecure.
if (GetCredentialsProvider()->GetChannelCredentials(kInsecureCredentialsType,
nullptr) != nullptr) {
credentials_types.push_back(kInsecureCredentialsType);
}
GPR_ASSERT(!credentials_types.empty());
for (const auto& cred : credentials_types) {
for (auto server_has_port : {true, false}) {
for (auto queue_pending_data : {true, false}) {
scenarios.emplace_back(server_has_port, queue_pending_data, cred);
}
}
}
return scenarios;
}
TEST_P(PortSharingEnd2endTest, HandoffAndDirectCalls) {
ResetStubs();
SendRpc(stub_handoff1_.get(), 5);
if (GetParam().server_has_port) {
SendRpc(stub_direct_.get(), 5);
}
}
TEST_P(PortSharingEnd2endTest, MultipleHandoff) {
for (int i = 0; i < 3; i++) {
ResetStubs();
SendRpc(stub_handoff2_.get(), 1);
}
}
TEST_P(PortSharingEnd2endTest, TwoHandoffPorts) {
for (int i = 0; i < 3; i++) {
ResetStubs();
SendRpc(stub_handoff1_.get(), 5);
SendRpc(stub_handoff2_.get(), 5);
}
}
INSTANTIATE_TEST_CASE_P(PortSharingEnd2end, PortSharingEnd2endTest,
::testing::ValuesIn(CreateTestScenarios()));
} // namespace
} // namespace testing
} // namespace grpc
#endif // GRPC_POSIX_SOCKET_TCP_SERVER
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -1241,6 +1241,8 @@ src/cpp/server/channel_argument_option.cc \
src/cpp/server/create_default_thread_pool.cc \
src/cpp/server/dynamic_thread_pool.cc \
src/cpp/server/dynamic_thread_pool.h \
src/cpp/server/external_connection_acceptor_impl.cc \
src/cpp/server/external_connection_acceptor_impl.h \
src/cpp/server/health/default_health_check_service.cc \
src/cpp/server/health/default_health_check_service.h \
src/cpp/server/health/health_check_service.cc \

@ -4283,6 +4283,25 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"grpc",
"grpc++",
"grpc++_test_util",
"grpc_test_util",
"test_tcp_server"
],
"headers": [],
"is_filegroup": false,
"language": "c++",
"name": "port_sharing_end2end_test",
"src": [
"test/cpp/end2end/port_sharing_end2end_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
@ -10206,6 +10225,7 @@
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/channel_filter.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/external_connection_acceptor_impl.h",
"src/cpp/server/health/default_health_check_service.h",
"src/cpp/server/thread_pool_interface.h",
"src/cpp/thread_manager/thread_manager.h"
@ -10347,6 +10367,8 @@
"src/cpp/server/create_default_thread_pool.cc",
"src/cpp/server/dynamic_thread_pool.cc",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/external_connection_acceptor_impl.cc",
"src/cpp/server/external_connection_acceptor_impl.h",
"src/cpp/server/health/default_health_check_service.cc",
"src/cpp/server/health/default_health_check_service.h",
"src/cpp/server/health/health_check_service.cc",

@ -5001,6 +5001,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "port_sharing_end2end_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save