Merge pull request #23104 from markdroth/chttp2_server_c++

Convert chttp2 server connection-handling code to C++.
reviewable/pr23245/r6
Mark D. Roth 5 years ago committed by GitHub
commit 728d02f4df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 69
      CMakeLists.txt
  2. 84
      Makefile
  3. 25
      build_autogenerated.yaml
  4. 656
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  5. 6
      src/core/ext/transport/chttp2/server/chttp2_server.h
  6. 2
      src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
  7. 2
      src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc
  8. 149
      src/core/lib/surface/server.cc
  9. 35
      src/core/lib/surface/server.h
  10. 3
      test/core/surface/BUILD
  11. 30
      test/core/surface/server_chttp2_test.cc
  12. 48
      tools/run_tests/generated/tests.json

@ -569,7 +569,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_c secure_endpoint_test)
add_dependencies(buildtests_c security_connector_test)
add_dependencies(buildtests_c sequential_connectivity_test)
add_dependencies(buildtests_c server_chttp2_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c server_ssl_test)
endif()
@ -793,6 +792,7 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx server_builder_with_socket_mutator_test)
endif()
add_dependencies(buildtests_cxx server_chttp2_test)
add_dependencies(buildtests_cxx server_context_test_spouse_test)
add_dependencies(buildtests_cxx server_early_return_test)
add_dependencies(buildtests_cxx server_interceptors_end2end_test)
@ -6834,35 +6834,6 @@ target_link_libraries(sequential_connectivity_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(server_chttp2_test
test/core/surface/server_chttp2_test.cc
)
target_include_directories(server_chttp2_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(server_chttp2_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr
address_sorting
upb
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@ -13008,6 +12979,44 @@ endif()
endif()
if(gRPC_BUILD_TESTS)
add_executable(server_chttp2_test
test/core/surface/server_chttp2_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(server_chttp2_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(server_chttp2_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr
address_sorting
upb
${_gRPC_GFLAGS_LIBRARIES}
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(server_context_test_spouse_test
test/cpp/test/server_context_test_spouse_test.cc
third_party/googletest/googletest/src/gtest-all.cc

@ -1112,7 +1112,6 @@ secure_channel_create_test: $(BINDIR)/$(CONFIG)/secure_channel_create_test
secure_endpoint_test: $(BINDIR)/$(CONFIG)/secure_endpoint_test
security_connector_test: $(BINDIR)/$(CONFIG)/security_connector_test
sequential_connectivity_test: $(BINDIR)/$(CONFIG)/sequential_connectivity_test
server_chttp2_test: $(BINDIR)/$(CONFIG)/server_chttp2_test
server_ssl_test: $(BINDIR)/$(CONFIG)/server_ssl_test
server_test: $(BINDIR)/$(CONFIG)/server_test
slice_buffer_test: $(BINDIR)/$(CONFIG)/slice_buffer_test
@ -1269,6 +1268,7 @@ secure_auth_context_test: $(BINDIR)/$(CONFIG)/secure_auth_context_test
server_builder_plugin_test: $(BINDIR)/$(CONFIG)/server_builder_plugin_test
server_builder_test: $(BINDIR)/$(CONFIG)/server_builder_test
server_builder_with_socket_mutator_test: $(BINDIR)/$(CONFIG)/server_builder_with_socket_mutator_test
server_chttp2_test: $(BINDIR)/$(CONFIG)/server_chttp2_test
server_context_test_spouse_test: $(BINDIR)/$(CONFIG)/server_context_test_spouse_test
server_early_return_test: $(BINDIR)/$(CONFIG)/server_early_return_test
server_fuzzer: $(BINDIR)/$(CONFIG)/server_fuzzer
@ -1487,7 +1487,6 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/secure_endpoint_test \
$(BINDIR)/$(CONFIG)/security_connector_test \
$(BINDIR)/$(CONFIG)/sequential_connectivity_test \
$(BINDIR)/$(CONFIG)/server_chttp2_test \
$(BINDIR)/$(CONFIG)/server_ssl_test \
$(BINDIR)/$(CONFIG)/server_test \
$(BINDIR)/$(CONFIG)/slice_buffer_test \
@ -1631,6 +1630,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/server_builder_plugin_test \
$(BINDIR)/$(CONFIG)/server_builder_test \
$(BINDIR)/$(CONFIG)/server_builder_with_socket_mutator_test \
$(BINDIR)/$(CONFIG)/server_chttp2_test \
$(BINDIR)/$(CONFIG)/server_context_test_spouse_test \
$(BINDIR)/$(CONFIG)/server_early_return_test \
$(BINDIR)/$(CONFIG)/server_interceptors_end2end_test \
@ -1788,6 +1788,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/server_builder_plugin_test \
$(BINDIR)/$(CONFIG)/server_builder_test \
$(BINDIR)/$(CONFIG)/server_builder_with_socket_mutator_test \
$(BINDIR)/$(CONFIG)/server_chttp2_test \
$(BINDIR)/$(CONFIG)/server_context_test_spouse_test \
$(BINDIR)/$(CONFIG)/server_early_return_test \
$(BINDIR)/$(CONFIG)/server_interceptors_end2end_test \
@ -2043,8 +2044,6 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/secure_endpoint_test || ( echo test secure_endpoint_test failed ; exit 1 )
$(E) "[RUN] Testing security_connector_test"
$(Q) $(BINDIR)/$(CONFIG)/security_connector_test || ( echo test security_connector_test failed ; exit 1 )
$(E) "[RUN] Testing server_chttp2_test"
$(Q) $(BINDIR)/$(CONFIG)/server_chttp2_test || ( echo test server_chttp2_test failed ; exit 1 )
$(E) "[RUN] Testing server_ssl_test"
$(Q) $(BINDIR)/$(CONFIG)/server_ssl_test || ( echo test server_ssl_test failed ; exit 1 )
$(E) "[RUN] Testing server_test"
@ -2299,6 +2298,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/server_builder_test || ( echo test server_builder_test failed ; exit 1 )
$(E) "[RUN] Testing server_builder_with_socket_mutator_test"
$(Q) $(BINDIR)/$(CONFIG)/server_builder_with_socket_mutator_test || ( echo test server_builder_with_socket_mutator_test failed ; exit 1 )
$(E) "[RUN] Testing server_chttp2_test"
$(Q) $(BINDIR)/$(CONFIG)/server_chttp2_test || ( echo test server_chttp2_test failed ; exit 1 )
$(E) "[RUN] Testing server_context_test_spouse_test"
$(Q) $(BINDIR)/$(CONFIG)/server_context_test_spouse_test || ( echo test server_context_test_spouse_test failed ; exit 1 )
$(E) "[RUN] Testing server_early_return_test"
@ -10060,38 +10061,6 @@ endif
endif
SERVER_CHTTP2_TEST_SRC = \
test/core/surface/server_chttp2_test.cc \
SERVER_CHTTP2_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(SERVER_CHTTP2_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/server_chttp2_test: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/server_chttp2_test: $(SERVER_CHTTP2_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(SERVER_CHTTP2_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/server_chttp2_test
endif
$(OBJDIR)/$(CONFIG)/test/core/surface/server_chttp2_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a
deps_server_chttp2_test: $(SERVER_CHTTP2_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(SERVER_CHTTP2_TEST_OBJS:.o=.dep)
endif
endif
SERVER_SSL_TEST_SRC = \
test/core/handshake/server_ssl.cc \
test/core/handshake/server_ssl_common.cc \
@ -17227,6 +17196,49 @@ endif
$(OBJDIR)/$(CONFIG)/test/cpp/server/server_builder_with_socket_mutator_test.o: $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/simple_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/simple_messages.grpc.pb.cc
SERVER_CHTTP2_TEST_SRC = \
test/core/surface/server_chttp2_test.cc \
SERVER_CHTTP2_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(SERVER_CHTTP2_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/server_chttp2_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/server_chttp2_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/server_chttp2_test: $(PROTOBUF_DEP) $(SERVER_CHTTP2_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(SERVER_CHTTP2_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/server_chttp2_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/core/surface/server_chttp2_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a
deps_server_chttp2_test: $(SERVER_CHTTP2_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(SERVER_CHTTP2_TEST_OBJS:.o=.dep)
endif
endif
SERVER_CONTEXT_TEST_SPOUSE_TEST_SRC = \
test/cpp/test/server_context_test_spouse_test.cc \

@ -4142,18 +4142,6 @@ targets:
- gpr
- address_sorting
- upb
- name: server_chttp2_test
build: test
language: c
headers: []
src:
- test/core/surface/server_chttp2_test.cc
deps:
- grpc_test_util
- grpc
- gpr
- address_sorting
- upb
- name: server_ssl_test
build: test
language: c
@ -6938,6 +6926,19 @@ targets:
- linux
- posix
- mac
- name: server_chttp2_test
gtest: true
build: test
language: c++
headers: []
src:
- test/core/surface/server_chttp2_test.cc
deps:
- grpc_test_util
- grpc
- gpr
- address_sorting
- upb
- name: server_context_test_spouse_test
gtest: true
build: test

@ -39,6 +39,8 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/resource_quota.h"
@ -47,85 +49,159 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
struct server_state {
grpc_server* server;
grpc_tcp_server* tcp_server;
grpc_channel_args* args;
gpr_mu mu;
bool shutdown;
grpc_closure tcp_server_shutdown_complete;
grpc_closure* server_destroy_listener_done;
grpc_core::HandshakeManager* pending_handshake_mgrs;
grpc_core::RefCountedPtr<grpc_core::channelz::ListenSocketNode>
channelz_listen_socket;
};
namespace grpc_core {
namespace {
class Chttp2ServerListener : public ServerListenerInterface {
public:
static grpc_error* Create(grpc_server* server, const char* addr,
grpc_channel_args* args, int* port_num);
static grpc_error* CreateWithAcceptor(grpc_server* server, const char* name,
grpc_channel_args* args);
// Do not instantiate directly. Use one of the factory methods above.
Chttp2ServerListener(grpc_server* server, grpc_channel_args* args);
~Chttp2ServerListener();
void Start(grpc_server* server, grpc_pollset** pollsets,
size_t npollsets) override;
channelz::ListenSocketNode* channelz_listen_socket_node() const override {
return channelz_listen_socket_.get();
}
void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
void Orphan() override;
struct server_connection_state {
gpr_refcount refs;
server_state* svr_state;
grpc_pollset* accepting_pollset;
grpc_tcp_server_acceptor* acceptor;
grpc_core::RefCountedPtr<grpc_core::HandshakeManager> handshake_mgr;
private:
class ConnectionState : public RefCounted<ConnectionState> {
public:
ConnectionState(Chttp2ServerListener* listener,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor,
RefCountedPtr<HandshakeManager> handshake_mgr,
grpc_channel_args* args, grpc_endpoint* endpoint);
~ConnectionState();
private:
static void OnTimeout(void* arg, grpc_error* error);
static void OnReceiveSettings(void* arg, grpc_error* error);
static void OnHandshakeDone(void* arg, grpc_error* error);
Chttp2ServerListener* const listener_;
grpc_pollset* const accepting_pollset_;
grpc_tcp_server_acceptor* const acceptor_;
RefCountedPtr<HandshakeManager> handshake_mgr_;
// State for enforcing handshake timeout on receiving HTTP/2 settings.
grpc_chttp2_transport* transport;
grpc_millis deadline;
grpc_timer timer;
grpc_closure on_timeout;
grpc_closure on_receive_settings;
grpc_pollset_set* interested_parties;
grpc_chttp2_transport* transport_ = nullptr;
grpc_millis deadline_;
grpc_timer timer_;
grpc_closure on_timeout_;
grpc_closure on_receive_settings_;
grpc_pollset_set* const interested_parties_;
};
static void server_connection_state_unref(
server_connection_state* connection_state) {
if (gpr_unref(&connection_state->refs)) {
if (connection_state->transport != nullptr) {
GRPC_CHTTP2_UNREF_TRANSPORT(connection_state->transport,
"receive settings timeout");
static void OnAccept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor);
RefCountedPtr<HandshakeManager> CreateHandshakeManager();
static void TcpServerShutdownComplete(void* arg, grpc_error* error);
static void DestroyListener(grpc_server* /*server*/, void* arg,
grpc_closure* destroy_done);
grpc_server* const server_;
grpc_channel_args* const args_;
grpc_tcp_server* tcp_server_;
Mutex mu_;
bool shutdown_ = true;
grpc_closure tcp_server_shutdown_complete_;
grpc_closure* on_destroy_done_ = nullptr;
HandshakeManager* pending_handshake_mgrs_ = nullptr;
RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
};
//
// Chttp2ServerListener::ConnectionState
//
grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
int timeout_ms =
grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
{120 * GPR_MS_PER_SEC, 1, INT_MAX});
return ExecCtx::Get()->Now() + timeout_ms;
}
grpc_pollset_set_del_pollset(connection_state->interested_parties,
connection_state->accepting_pollset);
grpc_pollset_set_destroy(connection_state->interested_parties);
gpr_free(connection_state);
Chttp2ServerListener::ConnectionState::ConnectionState(
Chttp2ServerListener* listener, grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor,
RefCountedPtr<HandshakeManager> handshake_mgr, grpc_channel_args* args,
grpc_endpoint* endpoint)
: listener_(listener),
accepting_pollset_(accepting_pollset),
acceptor_(acceptor),
handshake_mgr_(std::move(handshake_mgr)),
deadline_(GetConnectionDeadline(args)),
interested_parties_(grpc_pollset_set_create()) {
grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
interested_parties_, handshake_mgr_.get());
handshake_mgr_->DoHandshake(endpoint, args, deadline_, acceptor_,
OnHandshakeDone, this);
}
Chttp2ServerListener::ConnectionState::~ConnectionState() {
if (transport_ != nullptr) {
GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "receive settings timeout");
}
grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
grpc_pollset_set_destroy(interested_parties_);
}
static void on_timeout(void* arg, grpc_error* error) {
server_connection_state* connection_state =
static_cast<server_connection_state*>(arg);
void Chttp2ServerListener::ConnectionState::OnTimeout(void* arg,
grpc_error* error) {
ConnectionState* self = static_cast<ConnectionState*>(arg);
// Note that we may be called with GRPC_ERROR_NONE when the timer fires
// or with an error indicating that the timer system is being shut down.
if (error != GRPC_ERROR_CANCELLED) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Did not receive HTTP/2 settings before handshake timeout");
grpc_transport_perform_op(&connection_state->transport->base, op);
grpc_transport_perform_op(&self->transport_->base, op);
}
server_connection_state_unref(connection_state);
self->Unref();
}
static void on_receive_settings(void* arg, grpc_error* error) {
server_connection_state* connection_state =
static_cast<server_connection_state*>(arg);
void Chttp2ServerListener::ConnectionState::OnReceiveSettings(
void* arg, grpc_error* error) {
ConnectionState* self = static_cast<ConnectionState*>(arg);
if (error == GRPC_ERROR_NONE) {
grpc_timer_cancel(&connection_state->timer);
grpc_timer_cancel(&self->timer_);
}
server_connection_state_unref(connection_state);
self->Unref();
}
static void on_handshake_done(void* arg, grpc_error* error) {
auto* args = static_cast<grpc_core::HandshakerArgs*>(arg);
server_connection_state* connection_state =
static_cast<server_connection_state*>(args->user_data);
gpr_mu_lock(&connection_state->svr_state->mu);
grpc_resource_user* resource_user = grpc_server_get_default_resource_user(
connection_state->svr_state->server);
if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) {
void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
grpc_error* error) {
auto* args = static_cast<HandshakerArgs*>(arg);
ConnectionState* self = static_cast<ConnectionState*>(args->user_data);
{
MutexLock lock(&self->listener_->mu_);
grpc_resource_user* resource_user =
grpc_server_get_default_resource_user(self->listener_->server_);
if (error != GRPC_ERROR_NONE || self->listener_->shutdown_) {
const char* error_str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
grpc_resource_user* resource_user = grpc_server_get_default_resource_user(
connection_state->svr_state->server);
grpc_resource_user* resource_user =
grpc_server_get_default_resource_user(self->listener_->server_);
if (resource_user != nullptr) {
grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
grpc_resource_user_free(resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
}
if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
// We were shut down after handshaking completed successfully, so
@ -148,27 +224,31 @@ static void on_handshake_done(void* arg, grpc_error* error) {
grpc_transport* transport = grpc_create_chttp2_transport(
args->args, args->endpoint, false, resource_user);
grpc_server_setup_transport(
connection_state->svr_state->server, transport,
connection_state->accepting_pollset, args->args,
grpc_chttp2_transport_get_socket_node(transport), resource_user);
self->listener_->server_, transport, self->accepting_pollset_,
args->args, grpc_chttp2_transport_get_socket_node(transport),
resource_user);
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
connection_state->transport =
reinterpret_cast<grpc_chttp2_transport*>(transport);
gpr_ref(&connection_state->refs);
GRPC_CLOSURE_INIT(&connection_state->on_receive_settings,
on_receive_settings, connection_state,
// Note: The reinterpret_cast<>s here are safe, because
// grpc_chttp2_transport is a C-style extension of
// grpc_transport, so this is morally equivalent of a
// static_cast<> to a derived class.
// TODO(roth): Change to static_cast<> when we C++-ify the
// transport API.
self->transport_ = reinterpret_cast<grpc_chttp2_transport*>(transport);
self->Ref().release(); // Held by OnReceiveSettings().
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
grpc_schedule_on_exec_ctx);
grpc_chttp2_transport_start_reading(
transport, args->read_buffer, &connection_state->on_receive_settings);
grpc_chttp2_transport_start_reading(transport, args->read_buffer,
&self->on_receive_settings_);
grpc_channel_args_destroy(args->args);
gpr_ref(&connection_state->refs);
GRPC_CHTTP2_REF_TRANSPORT((grpc_chttp2_transport*)transport,
self->Ref().release(); // Held by OnTimeout().
GRPC_CHTTP2_REF_TRANSPORT(
reinterpret_cast<grpc_chttp2_transport*>(transport),
"receive settings timeout");
GRPC_CLOSURE_INIT(&connection_state->on_timeout, on_timeout,
connection_state, grpc_schedule_on_exec_ctx);
grpc_timer_init(&connection_state->timer, connection_state->deadline,
&connection_state->on_timeout);
GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
} else {
if (resource_user != nullptr) {
grpc_resource_user_free(resource_user,
@ -176,276 +256,232 @@ static void on_handshake_done(void* arg, grpc_error* error) {
}
}
}
connection_state->handshake_mgr->RemoveFromPendingMgrList(
&connection_state->svr_state->pending_handshake_mgrs);
gpr_mu_unlock(&connection_state->svr_state->mu);
connection_state->handshake_mgr.reset();
gpr_free(connection_state->acceptor);
grpc_tcp_server_unref(connection_state->svr_state->tcp_server);
server_connection_state_unref(connection_state);
self->handshake_mgr_->RemoveFromPendingMgrList(
&self->listener_->pending_handshake_mgrs_);
}
self->handshake_mgr_.reset();
gpr_free(self->acceptor_);
grpc_tcp_server_unref(self->listener_->tcp_server_);
self->Unref();
}
static void on_accept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) {
server_state* state = static_cast<server_state*>(arg);
gpr_mu_lock(&state->mu);
if (state->shutdown) {
gpr_mu_unlock(&state->mu);
grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE);
grpc_endpoint_destroy(tcp);
gpr_free(acceptor);
return;
//
// Chttp2ServerListener
//
grpc_error* Chttp2ServerListener::Create(grpc_server* server, const char* addr,
grpc_channel_args* args,
int* port_num) {
std::vector<grpc_error*> error_list;
grpc_resolved_addresses* resolved = nullptr;
Chttp2ServerListener* listener = nullptr;
// The bulk of this method is inside of a lambda to make cleanup
// easier without using goto.
grpc_error* error = [&]() {
*port_num = -1;
/* resolve address */
grpc_error* error = grpc_blocking_resolve_address(addr, "https", &resolved);
if (error != GRPC_ERROR_NONE) return error;
// Create Chttp2ServerListener.
listener = new Chttp2ServerListener(server, args);
error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
args, &listener->tcp_server_);
if (error != GRPC_ERROR_NONE) return error;
for (size_t i = 0; i < resolved->naddrs; i++) {
int port_temp;
error = grpc_tcp_server_add_port(listener->tcp_server_,
&resolved->addrs[i], &port_temp);
if (error != GRPC_ERROR_NONE) {
error_list.push_back(error);
} else {
if (*port_num == -1) {
*port_num = port_temp;
} else {
GPR_ASSERT(*port_num == port_temp);
}
grpc_resource_user* resource_user =
grpc_server_get_default_resource_user(state->server);
if (resource_user != nullptr &&
!grpc_resource_user_safe_alloc(resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
gpr_log(
GPR_ERROR,
"Memory quota exhausted, rejecting the connection, no handshaking.");
gpr_mu_unlock(&state->mu);
grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE);
grpc_endpoint_destroy(tcp);
gpr_free(acceptor);
return;
}
auto handshake_mgr = grpc_core::MakeRefCounted<grpc_core::HandshakeManager>();
handshake_mgr->AddToPendingMgrList(&state->pending_handshake_mgrs);
grpc_tcp_server_ref(state->tcp_server);
gpr_mu_unlock(&state->mu);
server_connection_state* connection_state =
static_cast<server_connection_state*>(
gpr_zalloc(sizeof(*connection_state)));
gpr_ref_init(&connection_state->refs, 1);
connection_state->svr_state = state;
connection_state->accepting_pollset = accepting_pollset;
connection_state->acceptor = acceptor;
connection_state->handshake_mgr = handshake_mgr;
connection_state->interested_parties = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(connection_state->interested_parties,
connection_state->accepting_pollset);
grpc_core::HandshakerRegistry::AddHandshakers(
grpc_core::HANDSHAKER_SERVER, state->args,
connection_state->interested_parties,
connection_state->handshake_mgr.get());
const grpc_arg* timeout_arg =
grpc_channel_args_find(state->args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
connection_state->deadline =
grpc_core::ExecCtx::Get()->Now() +
grpc_channel_arg_get_integer(timeout_arg,
{120 * GPR_MS_PER_SEC, 1, INT_MAX});
connection_state->handshake_mgr->DoHandshake(
tcp, state->args, connection_state->deadline, acceptor, on_handshake_done,
connection_state);
}
/* Server callback: start listening on our ports */
static void server_start_listener(grpc_server* /*server*/, void* arg,
grpc_pollset** pollsets,
size_t pollset_count) {
server_state* state = static_cast<server_state*>(arg);
gpr_mu_lock(&state->mu);
state->shutdown = false;
gpr_mu_unlock(&state->mu);
grpc_tcp_server_start(state->tcp_server, pollsets, pollset_count, on_accept,
state);
if (error_list.size() == resolved->naddrs) {
std::string msg =
absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
resolved->naddrs);
return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
msg.c_str(), error_list.data(), error_list.size());
} else if (!error_list.empty()) {
std::string msg = absl::StrFormat(
"Only %" PRIuPTR " addresses added out of total %" PRIuPTR
" resolved",
resolved->naddrs - error_list.size(), resolved->naddrs);
error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
msg.c_str(), error_list.data(), error_list.size());
gpr_log(GPR_INFO, "WARNING: %s", grpc_error_string(error));
GRPC_ERROR_UNREF(error);
/* we managed to bind some addresses: continue */
}
static void tcp_server_shutdown_complete(void* arg, grpc_error* error) {
server_state* state = static_cast<server_state*>(arg);
/* ensure all threads have unlocked */
gpr_mu_lock(&state->mu);
grpc_closure* destroy_done = state->server_destroy_listener_done;
GPR_ASSERT(state->shutdown);
if (state->pending_handshake_mgrs != nullptr) {
state->pending_handshake_mgrs->ShutdownAllPending(GRPC_ERROR_REF(error));
}
state->channelz_listen_socket.reset();
gpr_mu_unlock(&state->mu);
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.
grpc_core::ExecCtx::Get()->Flush();
if (destroy_done != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_done,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Get()->Flush();
// Create channelz node.
if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
GRPC_ENABLE_CHANNELZ_DEFAULT)) {
listener->channelz_listen_socket_ =
MakeRefCounted<channelz::ListenSocketNode>(
addr, absl::StrFormat("chttp2 listener %s", addr));
}
/* Register with the server only upon success */
grpc_server_add_listener(server,
OrphanablePtr<ServerListenerInterface>(listener));
return GRPC_ERROR_NONE;
}();
if (resolved != nullptr) {
grpc_resolved_addresses_destroy(resolved);
}
grpc_channel_args_destroy(state->args);
gpr_mu_destroy(&state->mu);
gpr_free(state);
if (error != GRPC_ERROR_NONE) {
if (listener != nullptr) {
if (listener->tcp_server_ != nullptr) {
grpc_tcp_server_unref(listener->tcp_server_);
} else {
delete listener;
}
/* Server callback: destroy the tcp listener (so we don't generate further
callbacks) */
static void server_destroy_listener(grpc_server* /*server*/, void* arg,
grpc_closure* destroy_done) {
server_state* state = static_cast<server_state*>(arg);
gpr_mu_lock(&state->mu);
state->shutdown = true;
state->server_destroy_listener_done = destroy_done;
grpc_tcp_server* tcp_server = state->tcp_server;
gpr_mu_unlock(&state->mu);
grpc_tcp_server_shutdown_listeners(tcp_server);
grpc_tcp_server_unref(tcp_server);
} else {
grpc_channel_args_destroy(args);
}
*port_num = 0;
}
for (grpc_error* error : error_list) {
GRPC_ERROR_UNREF(error);
}
return error;
}
static grpc_error* chttp2_server_add_acceptor(grpc_server* server,
grpc_error* Chttp2ServerListener::CreateWithAcceptor(grpc_server* server,
const char* name,
grpc_channel_args* args) {
grpc_tcp_server* tcp_server = nullptr;
grpc_error* err = GRPC_ERROR_NONE;
server_state* state = nullptr;
const grpc_arg* arg = nullptr;
grpc_core::TcpServerFdHandler** arg_val = nullptr;
state = static_cast<server_state*>(gpr_zalloc(sizeof(*state)));
GRPC_CLOSURE_INIT(&state->tcp_server_shutdown_complete,
tcp_server_shutdown_complete, state,
grpc_schedule_on_exec_ctx);
err = grpc_tcp_server_create(&state->tcp_server_shutdown_complete, args,
&tcp_server);
if (err != GRPC_ERROR_NONE) {
goto error;
}
state->server = server;
state->tcp_server = tcp_server;
state->args = args;
state->shutdown = true;
gpr_mu_init(&state->mu);
Chttp2ServerListener* listener = new Chttp2ServerListener(server, args);
grpc_error* error = grpc_tcp_server_create(
&listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_);
if (error != GRPC_ERROR_NONE) {
delete listener;
return error;
}
// TODO(yangg) channelz
arg = grpc_channel_args_find(args, name);
GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
arg_val = static_cast<grpc_core::TcpServerFdHandler**>(arg->value.pointer.p);
*arg_val = grpc_tcp_server_create_fd_handler(tcp_server);
grpc_server_add_listener(server, state, server_start_listener,
server_destroy_listener, /* node */ nullptr);
return err;
/* Error path: cleanup and return */
error:
GPR_ASSERT(err != GRPC_ERROR_NONE);
if (tcp_server) {
grpc_tcp_server_unref(tcp_server);
} else {
grpc_channel_args_destroy(args);
gpr_free(state);
TcpServerFdHandler** arg_val =
grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
*arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
grpc_server_add_listener(server,
OrphanablePtr<ServerListenerInterface>(listener));
return GRPC_ERROR_NONE;
}
return err;
Chttp2ServerListener::Chttp2ServerListener(grpc_server* server,
grpc_channel_args* args)
: server_(server), args_(args) {
GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
this, grpc_schedule_on_exec_ctx);
}
grpc_error* grpc_chttp2_server_add_port(grpc_server* server, const char* addr,
grpc_channel_args* args,
int* port_num) {
grpc_resolved_addresses* resolved = nullptr;
grpc_tcp_server* tcp_server = nullptr;
size_t i;
size_t count = 0;
int port_temp;
grpc_error* err = GRPC_ERROR_NONE;
server_state* state = nullptr;
grpc_error** errors = nullptr;
size_t naddrs = 0;
const grpc_arg* arg = nullptr;
Chttp2ServerListener::~Chttp2ServerListener() {
grpc_channel_args_destroy(args_);
}
*port_num = -1;
/* Server callback: start listening on our ports */
void Chttp2ServerListener::Start(grpc_server* /*server*/,
grpc_pollset** pollsets,
size_t pollset_count) {
{
MutexLock lock(&mu_);
shutdown_ = false;
}
grpc_tcp_server_start(tcp_server_, pollsets, pollset_count, OnAccept, this);
}
if (strncmp(addr, "external:", 9) == 0) {
return chttp2_server_add_acceptor(server, addr, args);
void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
MutexLock lock(&mu_);
on_destroy_done_ = on_destroy_done;
}
/* resolve address */
err = grpc_blocking_resolve_address(addr, "https", &resolved);
if (err != GRPC_ERROR_NONE) {
goto error;
RefCountedPtr<HandshakeManager> Chttp2ServerListener::CreateHandshakeManager() {
MutexLock lock(&mu_);
if (shutdown_) return nullptr;
grpc_resource_user* resource_user =
grpc_server_get_default_resource_user(server_);
if (resource_user != nullptr &&
!grpc_resource_user_safe_alloc(resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
gpr_log(GPR_ERROR,
"Memory quota exhausted, rejecting connection, no handshaking.");
return nullptr;
}
state = static_cast<server_state*>(gpr_zalloc(sizeof(*state)));
GRPC_CLOSURE_INIT(&state->tcp_server_shutdown_complete,
tcp_server_shutdown_complete, state,
grpc_schedule_on_exec_ctx);
err = grpc_tcp_server_create(&state->tcp_server_shutdown_complete, args,
&tcp_server);
if (err != GRPC_ERROR_NONE) {
goto error;
}
state->server = server;
state->tcp_server = tcp_server;
state->args = args;
state->shutdown = true;
gpr_mu_init(&state->mu);
naddrs = resolved->naddrs;
errors = static_cast<grpc_error**>(gpr_malloc(sizeof(*errors) * naddrs));
for (i = 0; i < naddrs; i++) {
errors[i] =
grpc_tcp_server_add_port(tcp_server, &resolved->addrs[i], &port_temp);
if (errors[i] == GRPC_ERROR_NONE) {
if (*port_num == -1) {
*port_num = port_temp;
} else {
GPR_ASSERT(*port_num == port_temp);
auto handshake_mgr = MakeRefCounted<HandshakeManager>();
handshake_mgr->AddToPendingMgrList(&pending_handshake_mgrs_);
grpc_tcp_server_ref(tcp_server_); // Ref held by ConnectionState.
return handshake_mgr;
}
count++;
void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) {
Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
RefCountedPtr<HandshakeManager> handshake_mgr =
self->CreateHandshakeManager();
if (handshake_mgr == nullptr) {
grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE);
grpc_endpoint_destroy(tcp);
gpr_free(acceptor);
return;
}
// Deletes itself when done.
new ConnectionState(self, accepting_pollset, acceptor,
std::move(handshake_mgr), self->args_, tcp);
}
if (count == 0) {
char* msg;
gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved",
naddrs);
err = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(msg, errors, naddrs);
gpr_free(msg);
goto error;
} else if (count != naddrs) {
char* msg;
gpr_asprintf(&msg,
"Only %" PRIuPTR " addresses added out of total %" PRIuPTR
" resolved",
count, naddrs);
err = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(msg, errors, naddrs);
gpr_free(msg);
const char* warning_message = grpc_error_string(err);
gpr_log(GPR_INFO, "WARNING: %s", warning_message);
/* we managed to bind some addresses: continue */
void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
grpc_error* error) {
Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
/* ensure all threads have unlocked */
grpc_closure* destroy_done = nullptr;
{
MutexLock lock(&self->mu_);
destroy_done = self->on_destroy_done_;
GPR_ASSERT(self->shutdown_);
if (self->pending_handshake_mgrs_ != nullptr) {
self->pending_handshake_mgrs_->ShutdownAllPending(GRPC_ERROR_REF(error));
}
grpc_resolved_addresses_destroy(resolved);
arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
if (grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT)) {
state->channelz_listen_socket =
grpc_core::MakeRefCounted<grpc_core::channelz::ListenSocketNode>(
addr, absl::StrFormat("chttp2 listener %s", addr));
self->channelz_listen_socket_.reset();
}
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.
ExecCtx::Get()->Flush();
if (destroy_done != nullptr) {
ExecCtx::Run(DEBUG_LOCATION, destroy_done, GRPC_ERROR_REF(error));
ExecCtx::Get()->Flush();
}
delete self;
}
/* Register with the server only upon success */
grpc_server_add_listener(server, state, server_start_listener,
server_destroy_listener,
state->channelz_listen_socket);
goto done;
/* Error path: cleanup and return */
error:
GPR_ASSERT(err != GRPC_ERROR_NONE);
if (resolved) {
grpc_resolved_addresses_destroy(resolved);
/* Server callback: destroy the tcp listener (so we don't generate further
callbacks) */
void Chttp2ServerListener::Orphan() {
grpc_tcp_server* tcp_server;
{
MutexLock lock(&mu_);
shutdown_ = true;
tcp_server = tcp_server_;
}
if (tcp_server) {
grpc_tcp_server_shutdown_listeners(tcp_server);
grpc_tcp_server_unref(tcp_server);
} else {
grpc_channel_args_destroy(args);
gpr_free(state);
}
*port_num = 0;
done:
if (errors != nullptr) {
for (i = 0; i < naddrs; i++) {
GRPC_ERROR_UNREF(errors[i]);
}
gpr_free(errors);
} // namespace
//
// Chttp2ServerAddPort()
//
grpc_error* Chttp2ServerAddPort(grpc_server* server, const char* addr,
grpc_channel_args* args, int* port_num) {
if (strncmp(addr, "external:", 9) == 0) {
return grpc_core::Chttp2ServerListener::CreateWithAcceptor(server, addr,
args);
}
return err;
return grpc_core::Chttp2ServerListener::Create(server, addr, args, port_num);
}
} // namespace grpc_core

@ -25,9 +25,13 @@
#include "src/core/lib/iomgr/error.h"
namespace grpc_core {
/// Adds a port to \a server. Sets \a port_num to the port number.
/// Takes ownership of \a args.
grpc_error* grpc_chttp2_server_add_port(grpc_server* server, const char* addr,
grpc_error* Chttp2ServerAddPort(grpc_server* server, const char* addr,
grpc_channel_args* args, int* port_num);
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */

@ -32,7 +32,7 @@ int grpc_server_add_insecure_http2_port(grpc_server* server, const char* addr) {
int port_num = 0;
GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2,
(server, addr));
grpc_error* err = grpc_chttp2_server_add_port(
grpc_error* err = grpc_core::Chttp2ServerAddPort(
server, addr,
grpc_channel_args_copy(grpc_server_get_channel_args(server)), &port_num);
if (err != GRPC_ERROR_NONE) {

@ -72,7 +72,7 @@ int grpc_server_add_secure_http2_port(grpc_server* server, const char* addr,
grpc_channel_args_copy_and_add(grpc_server_get_channel_args(server),
args_to_add, GPR_ARRAY_SIZE(args_to_add));
// Add server port.
err = grpc_chttp2_server_add_port(server, addr, args, &port_num);
err = grpc_core::Chttp2ServerAddPort(server, addr, args, &port_num);
done:
sc.reset(DEBUG_LOCATION, "server");

@ -24,12 +24,14 @@
#include <stdlib.h>
#include <string.h>
#include <list>
#include <utility>
#include <vector>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <utility>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/connected_channel.h"
@ -57,13 +59,12 @@ namespace {
void server_on_recv_initial_metadata(void* ptr, grpc_error* error);
void server_recv_trailing_metadata_ready(void* user_data, grpc_error* error);
struct listener {
void* arg;
void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
size_t pollset_count);
void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure);
struct listener* next;
intptr_t socket_uuid;
struct Listener {
explicit Listener(
grpc_core::OrphanablePtr<grpc_core::ServerListenerInterface> l)
: listener(std::move(l)) {}
grpc_core::OrphanablePtr<grpc_core::ServerListenerInterface> listener;
grpc_closure destroy_done;
};
@ -297,15 +298,15 @@ struct channel_broadcaster {
} // namespace
struct grpc_server {
grpc_channel_args* channel_args;
grpc_channel_args* channel_args = nullptr;
grpc_resource_user* default_resource_user;
grpc_resource_user* default_resource_user = nullptr;
grpc_completion_queue** cqs;
grpc_pollset** pollsets;
size_t cq_count;
size_t pollset_count;
bool started;
grpc_completion_queue** cqs = nullptr;
grpc_pollset** pollsets = nullptr;
size_t cq_count = 0;
size_t pollset_count = 0;
bool started = false;
/* The two following mutexes control access to server-state
mu_global controls access to non-call-related state (e.g., channel state)
@ -319,26 +320,26 @@ struct grpc_server {
/* startup synchronization: flag is protected by mu_global, signals whether
we are doing the listener start routine or not */
bool starting;
bool starting = false;
gpr_cv starting_cv;
// TODO(vjpai): Convert from a linked-list head pointer to a std::vector once
// grpc_server has a real constructor/destructor
registered_method* registered_methods;
registered_method* registered_methods = nullptr;
/** one request matcher for unregistered methods */
// TODO(vjpai): Convert to a std::unique_ptr once grpc_server has a real
// constructor and destructor.
RequestMatcherInterface* unregistered_request_matcher;
RequestMatcherInterface* unregistered_request_matcher = nullptr;
gpr_atm shutdown_flag;
uint8_t shutdown_published;
size_t num_shutdown_tags;
shutdown_tag* shutdown_tags;
gpr_atm shutdown_flag = 0;
uint8_t shutdown_published = 0;
size_t num_shutdown_tags = 0;
shutdown_tag* shutdown_tags = nullptr;
channel_data root_channel_data;
listener* listeners;
int listeners_destroyed;
std::list<Listener> listeners;
int listeners_destroyed = 0;
grpc_core::RefCount internal_refcount;
/** when did we print the last shutdown progress message */
@ -675,7 +676,6 @@ void server_ref(grpc_server* server) { server->internal_refcount.Ref(); }
void server_delete(grpc_server* server) {
registered_method* rm;
size_t i;
server->channelz_server.reset();
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
@ -691,7 +691,7 @@ void server_delete(grpc_server* server) {
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
gpr_free(server);
delete server;
}
void server_unref(grpc_server* server) {
@ -881,15 +881,6 @@ void start_new_rpc(grpc_call_element* elem) {
GRPC_SRM_PAYLOAD_NONE);
}
int num_listeners(grpc_server* server) {
listener* l;
int n = 0;
for (l = server->listeners; l; l = l->next) {
n++;
}
return n;
}
void done_shutdown_event(void* server, grpc_cq_completion* /*completion*/) {
server_unref(static_cast<grpc_server*>(server));
}
@ -929,17 +920,17 @@ void maybe_finish_shutdown(grpc_server* server) {
gpr_mu_unlock(&server->mu_call);
if (server->root_channel_data.next != &server->root_channel_data ||
server->listeners_destroyed < num_listeners(server)) {
server->listeners_destroyed < server->listeners.size()) {
if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
server->last_shutdown_message_time),
gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
gpr_log(GPR_DEBUG,
"Waiting for %d channels and %d/%d listeners to be destroyed"
" before shutting down server",
"Waiting for %d channels and %" PRIuPTR "/%" PRIuPTR
" listeners to be destroyed before shutting down server",
num_channels(server),
num_listeners(server) - server->listeners_destroyed,
num_listeners(server));
server->listeners.size() - server->listeners_destroyed,
server->listeners.size());
}
return;
}
@ -1312,26 +1303,21 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
grpc_server* server =
static_cast<grpc_server*>(gpr_zalloc(sizeof(grpc_server)));
grpc_server* server = new grpc_server;
gpr_mu_init(&server->mu_global);
gpr_mu_init(&server->mu_call);
gpr_cv_init(&server->starting_cv);
/* decremented by grpc_server_destroy */
new (&server->internal_refcount) grpc_core::RefCount();
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
server->channel_args = grpc_channel_args_copy(args);
const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
if (grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT)) {
arg = grpc_channel_args_find(
args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE);
size_t channel_tracer_max_memory = grpc_channel_arg_get_integer(
arg,
if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
GRPC_ENABLE_CHANNELZ_DEFAULT)) {
size_t channel_tracer_max_memory = grpc_channel_args_find_integer(
args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE,
{GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});
server->channelz_server =
grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
@ -1414,8 +1400,8 @@ void grpc_server_start(grpc_server* server) {
server->starting = true;
gpr_mu_unlock(&server->mu_global);
for (listener* l = server->listeners; l; l = l->next) {
l->start(server, l->arg, server->pollsets, server->pollset_count);
for (auto& listener : server->listeners) {
listener.listener->Start(server, server->pollsets, server->pollset_count);
}
gpr_mu_lock(&server->mu_global);
@ -1547,7 +1533,6 @@ void grpc_server_setup_transport(
*/
void grpc_server_shutdown_and_notify(grpc_server* server,
grpc_completion_queue* cq, void* tag) {
listener* l;
shutdown_tag* sdt;
channel_broadcaster broadcaster;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
@ -1599,13 +1584,18 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
gpr_mu_unlock(&server->mu_global);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server,
for (auto& listener : server->listeners) {
grpc_core::channelz::ListenSocketNode* channelz_listen_socket_node =
listener.listener->channelz_listen_socket_node();
if (server->channelz_server != nullptr &&
channelz_listen_socket_node != nullptr) {
server->channelz_server->RemoveChildListenSocket(
channelz_listen_socket_node->uuid());
}
GRPC_CLOSURE_INIT(&listener.destroy_done, listener_destroy_done, server,
grpc_schedule_on_exec_ctx);
l->destroy(server, l->arg, &l->destroy_done);
if (server->channelz_server != nullptr && l->socket_uuid != 0) {
server->channelz_server->RemoveChildListenSocket(l->socket_uuid);
}
listener.listener->SetOnDestroyDone(&listener.destroy_done);
listener.listener.reset();
}
channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
@ -1636,46 +1626,29 @@ void grpc_server_cancel_all_calls(grpc_server* server) {
}
void grpc_server_destroy(grpc_server* server) {
listener* l;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
gpr_mu_lock(&server->mu_global);
GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
while (server->listeners) {
l = server->listeners;
server->listeners = l->next;
gpr_free(l);
}
GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) ||
server->listeners.empty());
GPR_ASSERT(server->listeners_destroyed == server->listeners.size());
gpr_mu_unlock(&server->mu_global);
server_unref(server);
}
void grpc_server_add_listener(
grpc_server* server, void* listener_arg,
void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
size_t pollset_count),
void (*destroy)(grpc_server* server, void* arg, grpc_closure* on_done),
grpc_core::RefCountedPtr<grpc_core::channelz::ListenSocketNode> node) {
listener* l = static_cast<listener*>(gpr_malloc(sizeof(listener)));
l->arg = listener_arg;
l->start = start;
l->destroy = destroy;
l->socket_uuid = 0;
if (node != nullptr) {
l->socket_uuid = node->uuid();
if (server->channelz_server != nullptr) {
server->channelz_server->AddChildListenSocket(std::move(node));
}
}
l->next = server->listeners;
server->listeners = l;
grpc_server* server,
grpc_core::OrphanablePtr<grpc_core::ServerListenerInterface> listener) {
grpc_core::channelz::ListenSocketNode* listen_socket_node =
listener->channelz_listen_socket_node();
if (listen_socket_node != nullptr && server->channelz_server != nullptr) {
server->channelz_server->AddChildListenSocket(listen_socket_node->Ref());
}
server->listeners.emplace_back(std::move(listener));
}
namespace {

@ -32,14 +32,35 @@ extern const grpc_channel_filter grpc_server_top_filter;
/** Lightweight tracing of server channel state */
extern grpc_core::TraceFlag grpc_server_channel_trace;
/* Add a listener to the server: when the server starts, it will call start,
and when it shuts down, it will call destroy */
namespace grpc_core {
/// Interface for listeners.
/// Implementations must override the Orphan() method, which should stop
/// listening and initiate destruction of the listener.
class ServerListenerInterface : public Orphanable {
public:
virtual ~ServerListenerInterface() = default;
/// Starts listening.
virtual void Start(grpc_server* server, grpc_pollset** pollsets,
size_t npollsets) = 0;
/// Returns the channelz node for the listen socket, or null if not
/// supported.
virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0;
/// Sets a closure to be invoked by the listener when its destruction
/// is complete.
virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
};
} // namespace grpc_core
/* Add a listener to the server: when the server starts, it will call Start(),
and when it shuts down, it will orphan the listener. */
void grpc_server_add_listener(
grpc_server* server, void* listener_arg,
void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
size_t npollsets),
void (*destroy)(grpc_server* server, void* arg, grpc_closure* on_done),
grpc_core::RefCountedPtr<grpc_core::channelz::ListenSocketNode> node);
grpc_server* server,
grpc_core::OrphanablePtr<grpc_core::ServerListenerInterface> listener);
/* Setup a transport - creates a channel stack, binds the transport to the
server */

@ -156,6 +156,9 @@ grpc_cc_test(
grpc_cc_test(
name = "server_chttp2_test",
srcs = ["server_chttp2_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
deps = [
"//:gpr",

@ -16,6 +16,8 @@
*
*/
#include <gtest/gtest.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
@ -29,21 +31,19 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
void test_unparsable_target(void) {
TEST(ServerChttp2, UnparseableTarget) {
grpc_channel_args args = {0, nullptr};
grpc_server* server = grpc_server_create(&args, nullptr);
int port = grpc_server_add_insecure_http2_port(server, "[");
GPR_ASSERT(port == 0);
EXPECT_EQ(port, 0);
grpc_server_destroy(server);
}
// GRPC_ARG_ALLOW_REUSEPORT isn't supported for custom servers
#ifndef GRPC_UV
void test_add_same_port_twice() {
grpc_arg a;
a.type = GRPC_ARG_INTEGER;
a.key = const_cast<char*>(GRPC_ARG_ALLOW_REUSEPORT);
a.value.integer = 0;
TEST(ServerChttp2, AddSamePortTwice) {
grpc_arg a = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ALLOW_REUSEPORT), 0);
grpc_channel_args args = {1, &a};
int port = grpc_pick_unused_port_or_die();
@ -52,10 +52,10 @@ void test_add_same_port_twice() {
grpc_server_credentials* fake_creds =
grpc_fake_transport_security_server_credentials_create();
std::string addr = grpc_core::JoinHostPort("localhost", port);
GPR_ASSERT(
grpc_server_add_secure_http2_port(server, addr.c_str(), fake_creds));
GPR_ASSERT(
grpc_server_add_secure_http2_port(server, addr.c_str(), fake_creds) == 0);
EXPECT_EQ(grpc_server_add_secure_http2_port(server, addr.c_str(), fake_creds),
port);
EXPECT_EQ(grpc_server_add_secure_http2_port(server, addr.c_str(), fake_creds),
0);
grpc_server_credentials_release(fake_creds);
grpc_server_shutdown_and_notify(server, cq, nullptr);
@ -68,11 +68,9 @@ void test_add_same_port_twice() {
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
::testing::InitGoogleTest(&argc, argv);
grpc_init();
test_unparsable_target();
#ifndef GRPC_UV
test_add_same_port_twice();
#endif
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return 0;
return ret;
}

@ -2351,30 +2351,6 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "server_chttp2_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
@ -5239,6 +5215,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "server_chttp2_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save