diff --git a/BUILD b/BUILD index d0ea03ab388..d95808fe1d8 100644 --- a/BUILD +++ b/BUILD @@ -327,6 +327,7 @@ grpc_cc_library( "grpc_lb_policy_xds_cluster_manager", "grpc_lb_policy_xds_cluster_resolver", "grpc_resolver_xds", + "grpc_xds_server_config_fetcher", ], }, standalone = True, @@ -377,7 +378,8 @@ grpc_cc_library( select_deps = { "grpc_no_xds": [], "//conditions:default": [ - "grpc++_xds_credentials", + "grpc++_xds_client", + "grpc++_xds_server", ], }, standalone = True, @@ -393,16 +395,31 @@ grpc_cc_library( ) grpc_cc_library( - name = "grpc++_xds_credentials", + name = "grpc++_xds_client", srcs = [ "src/cpp/client/xds_credentials.cc", - "src/cpp/server/xds_server_credentials.cc", ], hdrs = [ "src/cpp/client/secure_credentials.h", + ], + language = "c++", + deps = [ + "grpc++_base", + ], +) + +grpc_cc_library( + name = "grpc++_xds_server", + srcs = [ + "src/cpp/server/xds_server_credentials.cc", + ], + hdrs = [ "src/cpp/server/secure_server_credentials.h", ], language = "c++", + public_hdrs = [ + "include/grpcpp/xds_server_builder.h", + ], deps = [ "grpc++_base", ], @@ -1384,6 +1401,17 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "grpc_xds_server_config_fetcher", + srcs = [ + "src/core/ext/xds/xds_server_config_fetcher.cc", + ], + language = "c++", + deps = [ + "grpc_xds_client", + ], +) + grpc_cc_library( name = "grpc_google_mesh_ca_certificate_provider_factory", srcs = [ diff --git a/BUILD.gn b/BUILD.gn index 26816b10f7f..cd0b2ce8e78 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -739,6 +739,7 @@ config("grpc_config") { "src/core/ext/xds/xds_client.h", "src/core/ext/xds/xds_client_stats.cc", "src/core/ext/xds/xds_client_stats.h", + "src/core/ext/xds/xds_server_config_fetcher.cc", "src/core/lib/avl/avl.cc", "src/core/lib/avl/avl.h", "src/core/lib/backoff/backoff.cc", @@ -1427,6 +1428,7 @@ config("grpc_config") { "include/grpcpp/support/sync_stream.h", "include/grpcpp/support/time.h", "include/grpcpp/support/validate_service_config.h", + "include/grpcpp/xds_server_builder.h", "src/cpp/client/channel_cc.cc", "src/cpp/client/client_callback.cc", "src/cpp/client/client_context.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index ffd6a24f801..929d022f8d7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1721,6 +1721,7 @@ add_library(grpc src/core/ext/xds/xds_certificate_provider.cc src/core/ext/xds/xds_client.cc src/core/ext/xds/xds_client_stats.cc + src/core/ext/xds/xds_server_config_fetcher.cc src/core/lib/avl/avl.cc src/core/lib/backoff/backoff.cc src/core/lib/channel/channel_args.cc @@ -2953,6 +2954,7 @@ foreach(_hdr include/grpcpp/support/sync_stream.h include/grpcpp/support/time.h include/grpcpp/support/validate_service_config.h + include/grpcpp/xds_server_builder.h ) string(REPLACE "include/" "" _path ${_hdr}) get_filename_component(_path ${_path} PATH) diff --git a/Makefile b/Makefile index 521c90aec90..f857030f11d 100644 --- a/Makefile +++ b/Makefile @@ -1310,6 +1310,7 @@ LIBGRPC_SRC = \ src/core/ext/xds/xds_certificate_provider.cc \ src/core/ext/xds/xds_client.cc \ src/core/ext/xds/xds_client_stats.cc \ + src/core/ext/xds/xds_server_config_fetcher.cc \ src/core/lib/avl/avl.cc \ src/core/lib/backoff/backoff.cc \ src/core/lib/channel/channel_args.cc \ @@ -2819,6 +2820,7 @@ src/core/ext/xds/xds_bootstrap.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_certificate_provider.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_client.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_client_stats.cc: $(OPENSSL_DEP) +src/core/ext/xds/xds_server_config_fetcher.cc: $(OPENSSL_DEP) src/core/lib/http/httpcli_security_connector.cc: $(OPENSSL_DEP) src/core/lib/security/authorization/authorization_engine.cc: $(OPENSSL_DEP) src/core/lib/security/authorization/evaluate_args.cc: $(OPENSSL_DEP) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 783d6b99b40..69131cd422c 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -1141,6 +1141,7 @@ libs: - src/core/ext/xds/xds_certificate_provider.cc - src/core/ext/xds/xds_client.cc - src/core/ext/xds/xds_client_stats.cc + - src/core/ext/xds/xds_server_config_fetcher.cc - src/core/lib/avl/avl.cc - src/core/lib/backoff/backoff.cc - src/core/lib/channel/channel_args.cc @@ -2298,6 +2299,7 @@ libs: - include/grpcpp/support/sync_stream.h - include/grpcpp/support/time.h - include/grpcpp/support/validate_service_config.h + - include/grpcpp/xds_server_builder.h headers: - src/cpp/client/create_channel_internal.h - src/cpp/client/secure_credentials.h diff --git a/config.m4 b/config.m4 index fd8e803601c..0bb8c0acd87 100644 --- a/config.m4 +++ b/config.m4 @@ -317,6 +317,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/xds/xds_certificate_provider.cc \ src/core/ext/xds/xds_client.cc \ src/core/ext/xds/xds_client_stats.cc \ + src/core/ext/xds/xds_server_config_fetcher.cc \ src/core/lib/avl/avl.cc \ src/core/lib/backoff/backoff.cc \ src/core/lib/channel/channel_args.cc \ diff --git a/config.w32 b/config.w32 index d7b3c36b78b..5993f087a5c 100644 --- a/config.w32 +++ b/config.w32 @@ -284,6 +284,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\xds\\xds_certificate_provider.cc " + "src\\core\\ext\\xds\\xds_client.cc " + "src\\core\\ext\\xds\\xds_client_stats.cc " + + "src\\core\\ext\\xds\\xds_server_config_fetcher.cc " + "src\\core\\lib\\avl\\avl.cc " + "src\\core\\lib\\backoff\\backoff.cc " + "src\\core\\lib\\channel\\channel_args.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index f3416989ea2..64a7e56b931 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -177,7 +177,8 @@ Pod::Spec.new do |s| 'include/grpcpp/support/stub_options.h', 'include/grpcpp/support/sync_stream.h', 'include/grpcpp/support/time.h', - 'include/grpcpp/support/validate_service_config.h' + 'include/grpcpp/support/validate_service_config.h', + 'include/grpcpp/xds_server_builder.h' end s.subspec 'Implementation' do |ss| diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 1fda7408eeb..dd5f73186f5 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -722,6 +722,7 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_client.h', 'src/core/ext/xds/xds_client_stats.cc', 'src/core/ext/xds/xds_client_stats.h', + 'src/core/ext/xds/xds_server_config_fetcher.cc', 'src/core/lib/avl/avl.cc', 'src/core/lib/avl/avl.h', 'src/core/lib/backoff/backoff.cc', diff --git a/grpc.def b/grpc.def index dfcbfdbb3f0..ec4f21b6420 100644 --- a/grpc.def +++ b/grpc.def @@ -57,6 +57,9 @@ EXPORTS grpc_server_request_registered_call grpc_server_create grpc_server_register_completion_queue + grpc_server_config_fetcher_xds_create + grpc_server_config_fetcher_destroy + grpc_server_set_config_fetcher grpc_server_add_insecure_http2_port grpc_server_start grpc_server_shutdown_and_notify diff --git a/grpc.gemspec b/grpc.gemspec index b8a85c3e247..9bcc6cbcfb5 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -637,6 +637,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/xds/xds_client.h ) s.files += %w( src/core/ext/xds/xds_client_stats.cc ) s.files += %w( src/core/ext/xds/xds_client_stats.h ) + s.files += %w( src/core/ext/xds/xds_server_config_fetcher.cc ) s.files += %w( src/core/lib/avl/avl.cc ) s.files += %w( src/core/lib/avl/avl.h ) s.files += %w( src/core/lib/backoff/backoff.cc ) diff --git a/grpc.gyp b/grpc.gyp index 5e2688271e9..29fd3f60589 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -729,6 +729,7 @@ 'src/core/ext/xds/xds_certificate_provider.cc', 'src/core/ext/xds/xds_client.cc', 'src/core/ext/xds/xds_client_stats.cc', + 'src/core/ext/xds/xds_server_config_fetcher.cc', 'src/core/lib/avl/avl.cc', 'src/core/lib/backoff/backoff.cc', 'src/core/lib/channel/channel_args.cc', diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 020acdb5edb..09bb1061922 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -411,6 +411,20 @@ GRPCAPI void grpc_server_register_completion_queue(grpc_server* server, grpc_completion_queue* cq, void* reserved); +typedef struct grpc_server_config_fetcher grpc_server_config_fetcher; + +/** EXPERIMENTAL. Creates an xDS config fetcher. */ +GRPCAPI grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(); + +/** EXPERIMENTAL. Destroys a config fetcher. */ +GRPCAPI void grpc_server_config_fetcher_destroy( + grpc_server_config_fetcher* config_fetcher); + +/** EXPERIMENTAL. Sets the server's config fetcher. Takes ownership. + Must be called before adding ports */ +GRPCAPI void grpc_server_set_config_fetcher( + grpc_server* server, grpc_server_config_fetcher* config_fetcher); + /** Add a HTTP2 over plaintext over tcp listener. Returns bound port number on success, 0 on failure. REQUIRES: server not started */ diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index 65488e8656d..5dc73a19b62 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -179,6 +179,7 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { int min_pollers, int max_pollers, int sync_cq_timeout_msec, std::vector> acceptors, + grpc_server_config_fetcher* server_config_fetcher = nullptr, grpc_resource_quota* server_rq = nullptr, std::vector< std::unique_ptr> diff --git a/include/grpcpp/server_builder.h b/include/grpcpp/server_builder.h index 06a0c8393e7..cb75d87ddd2 100644 --- a/include/grpcpp/server_builder.h +++ b/include/grpcpp/server_builder.h @@ -347,6 +347,11 @@ class ServerBuilder { return option_refs; } + /// Experimental API, subject to change. + void set_fetcher(grpc_server_config_fetcher* server_config_fetcher) { + server_config_fetcher_ = server_config_fetcher; + } + private: friend class ::grpc::testing::ServerBuilderPluginTest; @@ -405,6 +410,7 @@ class ServerBuilder { interceptor_creators_; std::vector> acceptors_; + grpc_server_config_fetcher* server_config_fetcher_ = nullptr; }; } // namespace grpc diff --git a/include/grpcpp/xds_server_builder.h b/include/grpcpp/xds_server_builder.h new file mode 100644 index 00000000000..1ed9e83ac3f --- /dev/null +++ b/include/grpcpp/xds_server_builder.h @@ -0,0 +1,43 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPCPP_XDS_SERVER_BUILDER_H +#define GRPCPP_XDS_SERVER_BUILDER_H + +#include + +#include + +namespace grpc { +namespace experimental { + +class XdsServerBuilder : public ::grpc::ServerBuilder { + public: + std::unique_ptr BuildAndStart() override { + grpc_server_config_fetcher* fetcher = + grpc_server_config_fetcher_xds_create(); + if (fetcher == nullptr) return nullptr; + set_fetcher(fetcher); + return ServerBuilder::BuildAndStart(); + } +}; + +} // namespace experimental +} // namespace grpc + +#endif /* GRPCPP_XDS_SERVER_BUILDER_H */ diff --git a/package.xml b/package.xml index 7ab90dfa3e3..3647b18bbef 100644 --- a/package.xml +++ b/package.xml @@ -617,6 +617,7 @@ + diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 65cf84de626..8446a9f9189 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -46,6 +46,7 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/resource_quota.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/slice/slice_internal.h" @@ -60,7 +61,7 @@ const char kUnixAbstractUriPrefix[] = "unix-abstract:"; class Chttp2ServerListener : public Server::ListenerInterface { public: - static grpc_error* Create(Server* server, const char* addr, + static grpc_error* Create(Server* server, grpc_resolved_address* addr, grpc_channel_args* args, int* port_num); static grpc_error* CreateWithAcceptor(Server* server, const char* name, @@ -82,6 +83,38 @@ class Chttp2ServerListener : public Server::ListenerInterface { void Orphan() override; private: + class ConfigFetcherWatcher + : public grpc_server_config_fetcher::WatcherInterface { + public: + explicit ConfigFetcherWatcher(Chttp2ServerListener* listener) + : listener_(listener) {} + + void UpdateConfig(grpc_channel_args* args) override { + { + MutexLock lock(&listener_->mu_); + // TODO(yashykt): Fix this + // grpc_channel_args_destroy(listener_->args_); + // listener_->args_ = args; + if (!listener_->shutdown_) return; // Already started listening. + } + int port_temp; + grpc_error* error = grpc_tcp_server_add_port( + listener_->tcp_server_, &listener_->resolved_address_, &port_temp); + if (error != GRPC_ERROR_NONE) { + GRPC_ERROR_UNREF(error); + gpr_log(GPR_ERROR, "Error adding port to server: %s", + grpc_error_string(error)); + // TODO(yashykt): We wouldn't need to assert here if we bound to the + // port earlier during AddPort. + GPR_ASSERT(0); + } + listener_->StartListening(); + } + + private: + Chttp2ServerListener* listener_; + }; + class ConnectionState : public RefCounted { public: ConnectionState(Chttp2ServerListener* listener, @@ -110,6 +143,8 @@ class Chttp2ServerListener : public Server::ListenerInterface { grpc_pollset_set* const interested_parties_; }; + void StartListening(); + static void OnAccept(void* arg, grpc_endpoint* tcp, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor); @@ -124,7 +159,9 @@ class Chttp2ServerListener : public Server::ListenerInterface { Server* const server_; grpc_channel_args* const args_; grpc_tcp_server* tcp_server_; + grpc_resolved_address resolved_address_; Mutex mu_; + ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr; bool shutdown_ = true; grpc_closure tcp_server_shutdown_complete_; grpc_closure* on_destroy_done_ = nullptr; @@ -288,81 +325,44 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg, // Chttp2ServerListener // -grpc_error* Chttp2ServerListener::Create(Server* server, const char* addr, +grpc_error* Chttp2ServerListener::Create(Server* server, + grpc_resolved_address* addr, grpc_channel_args* args, int* port_num) { - std::vector error_list; - grpc_resolved_addresses* resolved = nullptr; Chttp2ServerListener* listener = nullptr; // The bulk of this method is inside of a lambda to make cleanup // easier without using goto. grpc_error* error = [&]() { - *port_num = -1; - /* resolve address */ - grpc_error* error = GRPC_ERROR_NONE; - if (absl::StartsWith(addr, kUnixUriPrefix)) { - error = grpc_resolve_unix_domain_address( - addr + sizeof(kUnixUriPrefix) - 1, &resolved); - } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) { - error = grpc_resolve_unix_abstract_domain_address( - addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved); - } else { - error = grpc_blocking_resolve_address(addr, "https", &resolved); - } - if (error != GRPC_ERROR_NONE) return error; // Create Chttp2ServerListener. listener = new Chttp2ServerListener(server, args); error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_); if (error != GRPC_ERROR_NONE) return error; - for (size_t i = 0; i < resolved->naddrs; i++) { - int port_temp; - error = grpc_tcp_server_add_port(listener->tcp_server_, - &resolved->addrs[i], &port_temp); - if (error != GRPC_ERROR_NONE) { - error_list.push_back(error); - } else { - if (*port_num == -1) { - *port_num = port_temp; - } else { - GPR_ASSERT(*port_num == port_temp); - } - } - } - if (error_list.size() == resolved->naddrs) { - std::string msg = - absl::StrFormat("No address added out of total %" PRIuPTR " resolved", - resolved->naddrs); - return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING( - msg.c_str(), error_list.data(), error_list.size()); - } else if (!error_list.empty()) { - std::string msg = absl::StrFormat( - "Only %" PRIuPTR " addresses added out of total %" PRIuPTR - " resolved", - resolved->naddrs - error_list.size(), resolved->naddrs); - error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING( - msg.c_str(), error_list.data(), error_list.size()); - gpr_log(GPR_INFO, "WARNING: %s", grpc_error_string(error)); - GRPC_ERROR_UNREF(error); - /* we managed to bind some addresses: continue */ + if (server->config_fetcher() != nullptr) { + listener->resolved_address_ = *addr; + // TODO(yashykt): Consider binding so as to be able to return the port + // number. + } else { + error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num); + if (error != GRPC_ERROR_NONE) return error; } // Create channelz node. if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ, GRPC_ENABLE_CHANNELZ_DEFAULT)) { + std::string string_address = grpc_sockaddr_to_string(addr, false); listener->channelz_listen_socket_ = MakeRefCounted( - addr, absl::StrFormat("chttp2 listener %s", addr)); + string_address.c_str(), + absl::StrFormat("chttp2 listener %s", string_address.c_str())); } - /* Register with the server only upon success */ + // Register with the server only upon success server->AddListener(OrphanablePtr(listener)); return GRPC_ERROR_NONE; }(); - if (resolved != nullptr) { - grpc_resolved_addresses_destroy(resolved); - } if (error != GRPC_ERROR_NONE) { if (listener != nullptr) { if (listener->tcp_server_ != nullptr) { + // listener is deleted when tcp_server_ is shutdown. grpc_tcp_server_unref(listener->tcp_server_); } else { delete listener; @@ -370,10 +370,6 @@ grpc_error* Chttp2ServerListener::Create(Server* server, const char* addr, } else { grpc_channel_args_destroy(args); } - *port_num = 0; - } - for (grpc_error* error : error_list) { - GRPC_ERROR_UNREF(error); } return error; } @@ -408,13 +404,25 @@ Chttp2ServerListener::~Chttp2ServerListener() { } /* Server callback: start listening on our ports */ -void Chttp2ServerListener::Start(Server* /*server*/, - const std::vector* pollsets) { - { - MutexLock lock(&mu_); - shutdown_ = false; +void Chttp2ServerListener::Start( + Server* /*server*/, const std::vector* /* pollsets */) { + if (server_->config_fetcher() != nullptr) { + auto watcher = absl::make_unique(this); + { + MutexLock lock(&mu_); + config_fetcher_watcher_ = watcher.get(); + } + server_->config_fetcher()->StartWatch( + grpc_sockaddr_to_string(&resolved_address_, false), std::move(watcher)); + } else { + StartListening(); } - grpc_tcp_server_start(tcp_server_, pollsets, OnAccept, this); +} + +void Chttp2ServerListener::StartListening() { + grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this); + MutexLock lock(&mu_); + shutdown_ = false; } void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) { @@ -483,6 +491,11 @@ void Chttp2ServerListener::TcpServerShutdownComplete(void* arg, /* Server callback: destroy the tcp listener (so we don't generate further callbacks) */ void Chttp2ServerListener::Orphan() { + // Cancel the watch before shutting down so as to avoid holding a ref to the + // listener in the watcher. + if (config_fetcher_watcher_ != nullptr) { + server_->config_fetcher()->CancelWatch(config_fetcher_watcher_); + } grpc_tcp_server* tcp_server; { MutexLock lock(&mu_); @@ -505,7 +518,70 @@ grpc_error* Chttp2ServerAddPort(Server* server, const char* addr, return grpc_core::Chttp2ServerListener::CreateWithAcceptor(server, addr, args); } - return grpc_core::Chttp2ServerListener::Create(server, addr, args, port_num); + *port_num = -1; + grpc_resolved_addresses* resolved = nullptr; + std::vector error_list; + // Using lambda to avoid use of goto. + grpc_error* error = [&]() { + if (absl::StartsWith(addr, kUnixUriPrefix)) { + error = grpc_resolve_unix_domain_address( + addr + sizeof(kUnixUriPrefix) - 1, &resolved); + } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) { + error = grpc_resolve_unix_abstract_domain_address( + addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved); + } else { + error = grpc_blocking_resolve_address(addr, "https", &resolved); + } + if (error != GRPC_ERROR_NONE) return error; + // Create a listener for each resolved address. + for (size_t i = 0; i < resolved->naddrs; i++) { + // If address has a wildcard port (0), use the same port as a previous + // listener. + if (*port_num != -1 && grpc_sockaddr_get_port(&resolved->addrs[i]) == 0) { + grpc_sockaddr_set_port(&resolved->addrs[i], *port_num); + } + int port_temp; + error = grpc_core::Chttp2ServerListener::Create( + server, &resolved->addrs[i], grpc_channel_args_copy(args), + &port_temp); + if (error != GRPC_ERROR_NONE) { + error_list.push_back(error); + } else { + if (*port_num == -1) { + *port_num = port_temp; + } else { + GPR_ASSERT(*port_num == port_temp); + } + } + } + if (error_list.size() == resolved->naddrs) { + std::string msg = + absl::StrFormat("No address added out of total %" PRIuPTR " resolved", + resolved->naddrs); + return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING( + msg.c_str(), error_list.data(), error_list.size()); + } else if (!error_list.empty()) { + std::string msg = absl::StrFormat( + "Only %" PRIuPTR " addresses added out of total %" PRIuPTR + " resolved", + resolved->naddrs - error_list.size(), resolved->naddrs); + error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING( + msg.c_str(), error_list.data(), error_list.size()); + gpr_log(GPR_INFO, "WARNING: %s", grpc_error_string(error)); + GRPC_ERROR_UNREF(error); + // we managed to bind some addresses: continue without error + } + return GRPC_ERROR_NONE; + }(); // lambda end + for (grpc_error* error : error_list) { + GRPC_ERROR_UNREF(error); + } + grpc_channel_args_destroy(args); + if (resolved != nullptr) { + grpc_resolved_addresses_destroy(resolved); + } + if (error != GRPC_ERROR_NONE) *port_num = 0; + return error; } } // namespace grpc_core diff --git a/src/core/ext/xds/xds_server_config_fetcher.cc b/src/core/ext/xds/xds_server_config_fetcher.cc new file mode 100644 index 00000000000..5c5e8ee21f1 --- /dev/null +++ b/src/core/ext/xds/xds_server_config_fetcher.cc @@ -0,0 +1,131 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include + +#include "src/core/ext/xds/xds_client.h" +#include "src/core/lib/surface/api_trace.h" +#include "src/core/lib/surface/server.h" + +namespace grpc_core { +namespace { + +class XdsServerConfigFetcher : public grpc_server_config_fetcher { + public: + explicit XdsServerConfigFetcher(RefCountedPtr xds_client) + : xds_client_(std::move(xds_client)) { + GPR_ASSERT(xds_client_ != nullptr); + } + + void StartWatch(std::string listening_address, + std::unique_ptr + watcher) override { + grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get(); + auto listener_watcher = + absl::make_unique(std::move(watcher)); + auto* listener_watcher_ptr = listener_watcher.get(); + // TODO(yashykt): Get the resource name id from bootstrap + xds_client_->WatchListenerData( + absl::StrCat("grpc/server?xds.resource.listening_address=", + listening_address), + std::move(listener_watcher)); + MutexLock lock(&mu_); + auto& watcher_state = watchers_[watcher_ptr]; + watcher_state.listening_address = listening_address; + watcher_state.listener_watcher = listener_watcher_ptr; + } + + void CancelWatch( + grpc_server_config_fetcher::WatcherInterface* watcher) override { + MutexLock lock(&mu_); + auto it = watchers_.find(watcher); + if (it != watchers_.end()) { + // Cancel the watch on the listener before erasing + xds_client_->CancelListenerDataWatch(it->second.listening_address, + it->second.listener_watcher, + false /* delay_unsubscription */); + watchers_.erase(it); + } + } + + // Return the interested parties from the xds client so that it can be polled. + grpc_pollset_set* interested_parties() override { + return xds_client_->interested_parties(); + } + + private: + class ListenerWatcher : public XdsClient::ListenerWatcherInterface { + public: + explicit ListenerWatcher( + std::unique_ptr + server_config_watcher) + : server_config_watcher_(std::move(server_config_watcher)) {} + + void OnListenerChanged(XdsApi::LdsUpdate listener) override { + // TODO(yashykt): Construct channel args according to received update + server_config_watcher_->UpdateConfig(nullptr); + } + + void OnError(grpc_error* error) override { + gpr_log(GPR_ERROR, "ListenerWatcher:%p XdsClient reports error: %s", this, + grpc_error_string(error)); + GRPC_ERROR_UNREF(error); + // TODO(yashykt): We might want to bubble this error to the application. + } + + void OnResourceDoesNotExist() override { + gpr_log(GPR_ERROR, + "ListenerWatcher:%p XdsClient reports requested listener does " + "not exist", + this); + // TODO(yashykt): We might want to bubble this error to the application. + } + + private: + std::unique_ptr + server_config_watcher_; + }; + + struct WatcherState { + std::string listening_address; + ListenerWatcher* listener_watcher = nullptr; + }; + + RefCountedPtr xds_client_; + Mutex mu_; + std::map + watchers_; +}; + +} // namespace +} // namespace grpc_core + +grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ()); + grpc_error* error = GRPC_ERROR_NONE; + grpc_core::RefCountedPtr xds_client = + grpc_core::XdsClient::GetOrCreate(&error); + if (error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Failed to create xds client: %s", + grpc_error_string(error)); + return nullptr; + } + return new grpc_core::XdsServerConfigFetcher(std::move(xds_client)); +} diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index cbe81596d66..852c93d088f 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -538,6 +538,14 @@ Server::Server(const grpc_channel_args* args) Server::~Server() { grpc_channel_args_destroy(channel_args_); + // Remove the cq pollsets from the config_fetcher. + if (started_ && config_fetcher_ != nullptr && + config_fetcher_->interested_parties() != nullptr) { + for (grpc_pollset* pollset : pollsets_) { + grpc_pollset_set_del_pollset(config_fetcher_->interested_parties(), + pollset); + } + } for (size_t i = 0; i < cqs_.size(); i++) { GRPC_CQ_INTERNAL_UNREF(cqs_[i], "server"); } @@ -571,6 +579,16 @@ void Server::Start() { MutexLock lock(&mu_global_); starting_ = true; } + // Register the interested parties from the config fetcher to the cq pollsets + // before starting listeners so that config fetcher is being polled when the + // listeners start watch the fetcher. + if (config_fetcher_ != nullptr && + config_fetcher_->interested_parties() != nullptr) { + for (grpc_pollset* pollset : pollsets_) { + grpc_pollset_set_add_pollset(config_fetcher_->interested_parties(), + pollset); + } + } for (auto& listener : listeners_) { listener.listener->Start(this, &pollsets_); } @@ -1562,3 +1580,22 @@ grpc_call_error grpc_server_request_registered_call( rm, call, deadline, request_metadata, optional_payload, cq_bound_to_call, cq_for_notification, tag_new); } + +void grpc_server_set_config_fetcher( + grpc_server* server, grpc_server_config_fetcher* server_config_fetcher) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + GRPC_API_TRACE("grpc_server_set_config_fetcher(server=%p, config_fetcher=%p)", + 2, (server, server_config_fetcher)); + server->core_server->set_config_fetcher( + std::unique_ptr(server_config_fetcher)); +} + +void grpc_server_config_fetcher_destroy( + grpc_server_config_fetcher* server_config_fetcher) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + GRPC_API_TRACE("grpc_server_config_fetcher_destroy(config_fetcher=%p)", 1, + (server_config_fetcher)); + delete server_config_fetcher; +} diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 9df7caf7c11..06bdd4d7a9d 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -103,6 +103,15 @@ class Server : public InternallyRefCounted { // result is valid for the lifetime of the server. const std::vector& pollsets() const { return pollsets_; } + grpc_server_config_fetcher* config_fetcher() const { + return config_fetcher_.get(); + } + + void set_config_fetcher( + std::unique_ptr config_fetcher) { + config_fetcher_ = std::move(config_fetcher); + } + bool HasOpenConnections(); // Adds a listener to the server. When the server starts, it will call @@ -350,6 +359,7 @@ class Server : public InternallyRefCounted { grpc_channel_args* const channel_args_; grpc_resource_user* default_resource_user_ = nullptr; RefCountedPtr channelz_node_; + std::unique_ptr config_fetcher_; std::vector cqs_; std::vector pollsets_; @@ -394,4 +404,26 @@ struct grpc_server { grpc_core::OrphanablePtr core_server; }; +// TODO(roth): Eventually, will need a way to modify configuration even after +// a connection is established (e.g., to change things like L7 rate +// limiting, RBAC, and fault injection configs). One possible option +// would be to do something like ServiceConfig and ConfigSelector, but +// that might add unnecessary per-call overhead. Need to consider other +// approaches here. +struct grpc_server_config_fetcher { + public: + class WatcherInterface { + public: + virtual ~WatcherInterface() = default; + virtual void UpdateConfig(grpc_channel_args* args) = 0; + }; + + virtual ~grpc_server_config_fetcher() = default; + + virtual void StartWatch(std::string listening_address, + std::unique_ptr watcher) = 0; + virtual void CancelWatch(WatcherInterface* watcher) = 0; + virtual grpc_pollset_set* interested_parties() = 0; +}; + #endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */ diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index a636504082a..dc381257a6e 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -331,7 +331,7 @@ std::unique_ptr ServerBuilder::BuildAndStart() { std::unique_ptr server(new grpc::Server( &args, sync_server_cqs, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec, - std::move(acceptors_), resource_quota_, + std::move(acceptors_), server_config_fetcher_, resource_quota_, std::move(interceptor_creators_))); ServerInitializer* initializer = server->initializer(); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 13357ae4919..2b5486af2f2 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -877,6 +877,7 @@ Server::Server( int min_pollers, int max_pollers, int sync_cq_timeout_msec, std::vector> acceptors, + grpc_server_config_fetcher* server_config_fetcher, grpc_resource_quota* server_rq, std::vector< std::unique_ptr> @@ -940,6 +941,7 @@ Server::Server( } } server_ = grpc_server_create(&channel_args, nullptr); + grpc_server_set_config_fetcher(server_, server_config_fetcher); } Server::~Server() { diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index d104aeee405..8c10398328a 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -293,6 +293,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/xds/xds_certificate_provider.cc', 'src/core/ext/xds/xds_client.cc', 'src/core/ext/xds/xds_client_stats.cc', + 'src/core/ext/xds/xds_server_config_fetcher.cc', 'src/core/lib/avl/avl.cc', 'src/core/lib/backoff/backoff.cc', 'src/core/lib/channel/channel_args.cc', diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index fa1adb73e30..edc7203633e 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -80,6 +80,9 @@ grpc_server_register_method_type grpc_server_register_method_import; grpc_server_request_registered_call_type grpc_server_request_registered_call_import; grpc_server_create_type grpc_server_create_import; grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; +grpc_server_config_fetcher_xds_create_type grpc_server_config_fetcher_xds_create_import; +grpc_server_config_fetcher_destroy_type grpc_server_config_fetcher_destroy_import; +grpc_server_set_config_fetcher_type grpc_server_set_config_fetcher_import; grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; grpc_server_start_type grpc_server_start_import; grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import; @@ -360,6 +363,9 @@ void grpc_rb_load_imports(HMODULE library) { grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call"); grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create"); grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue"); + grpc_server_config_fetcher_xds_create_import = (grpc_server_config_fetcher_xds_create_type) GetProcAddress(library, "grpc_server_config_fetcher_xds_create"); + grpc_server_config_fetcher_destroy_import = (grpc_server_config_fetcher_destroy_type) GetProcAddress(library, "grpc_server_config_fetcher_destroy"); + grpc_server_set_config_fetcher_import = (grpc_server_set_config_fetcher_type) GetProcAddress(library, "grpc_server_set_config_fetcher"); grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port"); grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start"); grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index d159903956a..8adea95481e 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -215,6 +215,15 @@ extern grpc_server_create_type grpc_server_create_import; typedef void(*grpc_server_register_completion_queue_type)(grpc_server* server, grpc_completion_queue* cq, void* reserved); extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; #define grpc_server_register_completion_queue grpc_server_register_completion_queue_import +typedef grpc_server_config_fetcher*(*grpc_server_config_fetcher_xds_create_type)(); +extern grpc_server_config_fetcher_xds_create_type grpc_server_config_fetcher_xds_create_import; +#define grpc_server_config_fetcher_xds_create grpc_server_config_fetcher_xds_create_import +typedef void(*grpc_server_config_fetcher_destroy_type)(grpc_server_config_fetcher* config_fetcher); +extern grpc_server_config_fetcher_destroy_type grpc_server_config_fetcher_destroy_import; +#define grpc_server_config_fetcher_destroy grpc_server_config_fetcher_destroy_import +typedef void(*grpc_server_set_config_fetcher_type)(grpc_server* server, grpc_server_config_fetcher* config_fetcher); +extern grpc_server_set_config_fetcher_type grpc_server_set_config_fetcher_import; +#define grpc_server_set_config_fetcher grpc_server_set_config_fetcher_import typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server* server, const char* addr); extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; #define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import diff --git a/test/core/surface/public_headers_must_be_c89.c b/test/core/surface/public_headers_must_be_c89.c index 785948c7b5c..8424b1da2d8 100644 --- a/test/core/surface/public_headers_must_be_c89.c +++ b/test/core/surface/public_headers_must_be_c89.c @@ -126,6 +126,9 @@ int main(int argc, char **argv) { printf("%lx", (unsigned long) grpc_server_request_registered_call); printf("%lx", (unsigned long) grpc_server_create); printf("%lx", (unsigned long) grpc_server_register_completion_queue); + printf("%lx", (unsigned long) grpc_server_config_fetcher_xds_create); + printf("%lx", (unsigned long) grpc_server_config_fetcher_destroy); + printf("%lx", (unsigned long) grpc_server_set_config_fetcher); printf("%lx", (unsigned long) grpc_server_add_insecure_http2_port); printf("%lx", (unsigned long) grpc_server_start); printf("%lx", (unsigned long) grpc_server_shutdown_and_notify); diff --git a/test/cpp/end2end/channelz_service_test.cc b/test/cpp/end2end/channelz_service_test.cc index 04f21adb76b..51fccf33c7c 100644 --- a/test/cpp/end2end/channelz_service_test.cc +++ b/test/cpp/end2end/channelz_service_test.cc @@ -746,15 +746,25 @@ TEST_F(ChannelzServerTest, GetServerListenSocketsTest) { &get_server_response); EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); EXPECT_EQ(get_server_response.server_size(), 1); - EXPECT_EQ(get_server_response.server(0).listen_socket_size(), 1); + // The server address gets resolved to two addresses, one for ipv4 and one for + // ipv6, and hence two sockets. + EXPECT_EQ(get_server_response.server(0).listen_socket_size(), 2); GetSocketRequest get_socket_request; GetSocketResponse get_socket_response; get_socket_request.set_socket_id( get_server_response.server(0).listen_socket(0).socket_id()); EXPECT_TRUE( get_server_response.server(0).listen_socket(0).name().find("http")); - ClientContext get_socket_context; - s = channelz_stub_->GetSocket(&get_socket_context, get_socket_request, + ClientContext get_socket_context_1; + s = channelz_stub_->GetSocket(&get_socket_context_1, get_socket_request, + &get_socket_response); + EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); + get_socket_request.set_socket_id( + get_server_response.server(0).listen_socket(1).socket_id()); + ClientContext get_socket_context_2; + EXPECT_TRUE( + get_server_response.server(0).listen_socket(1).name().find("http")); + s = channelz_stub_->GetSocket(&get_socket_context_2, get_socket_request, &get_socket_response); EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); } diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index c556c64c63f..63c81dd422b 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -1037,7 +1037,8 @@ include/grpcpp/support/string_ref.h \ include/grpcpp/support/stub_options.h \ include/grpcpp/support/sync_stream.h \ include/grpcpp/support/time.h \ -include/grpcpp/support/validate_service_config.h +include/grpcpp/support/validate_service_config.h \ +include/grpcpp/xds_server_builder.h # This tag can be used to specify the character encoding of the source files # that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 3d841eabfbd..88e25c8402c 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1038,6 +1038,7 @@ include/grpcpp/support/stub_options.h \ include/grpcpp/support/sync_stream.h \ include/grpcpp/support/time.h \ include/grpcpp/support/validate_service_config.h \ +include/grpcpp/xds_server_builder.h \ src/core/ext/filters/census/grpc_context.cc \ src/core/ext/filters/client_channel/backend_metric.cc \ src/core/ext/filters/client_channel/backend_metric.h \ @@ -1569,6 +1570,7 @@ src/core/ext/xds/xds_client.cc \ src/core/ext/xds/xds_client.h \ src/core/ext/xds/xds_client_stats.cc \ src/core/ext/xds/xds_client_stats.h \ +src/core/ext/xds/xds_server_config_fetcher.cc \ src/core/lib/avl/avl.cc \ src/core/lib/avl/avl.h \ src/core/lib/backoff/backoff.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index e1f1f3935ca..2b0d8bfada8 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1406,6 +1406,7 @@ src/core/ext/xds/xds_client.cc \ src/core/ext/xds/xds_client.h \ src/core/ext/xds/xds_client_stats.cc \ src/core/ext/xds/xds_client_stats.h \ +src/core/ext/xds/xds_server_config_fetcher.cc \ src/core/lib/README.md \ src/core/lib/avl/avl.cc \ src/core/lib/avl/avl.h \