XdsServerBuilder, config fetching per resolved address and delaying bind/listen till fetch is complete

pull/24956/head
Yash Tibrewal 4 years ago
parent 4a0d59c27a
commit 4105d2ce20
  1. 34
      BUILD
  2. 2
      BUILD.gn
  3. 2
      CMakeLists.txt
  4. 2
      Makefile
  5. 2
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 3
      gRPC-C++.podspec
  9. 1
      gRPC-Core.podspec
  10. 3
      grpc.def
  11. 1
      grpc.gemspec
  12. 1
      grpc.gyp
  13. 14
      include/grpc/grpc.h
  14. 1
      include/grpcpp/server.h
  15. 6
      include/grpcpp/server_builder.h
  16. 43
      include/grpcpp/xds_server_builder.h
  17. 1
      package.xml
  18. 196
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  19. 131
      src/core/ext/xds/xds_server_config_fetcher.cc
  20. 37
      src/core/lib/surface/server.cc
  21. 32
      src/core/lib/surface/server.h
  22. 2
      src/cpp/server/server_builder.cc
  23. 2
      src/cpp/server/server_cc.cc
  24. 1
      src/python/grpcio/grpc_core_dependencies.py
  25. 6
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  26. 9
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  27. 3
      test/core/surface/public_headers_must_be_c89.c
  28. 16
      test/cpp/end2end/channelz_service_test.cc
  29. 3
      tools/doxygen/Doxyfile.c++
  30. 2
      tools/doxygen/Doxyfile.c++.internal
  31. 1
      tools/doxygen/Doxyfile.core.internal

34
BUILD

@ -327,6 +327,7 @@ grpc_cc_library(
"grpc_lb_policy_xds_cluster_manager",
"grpc_lb_policy_xds_cluster_resolver",
"grpc_resolver_xds",
"grpc_xds_server_config_fetcher",
],
},
standalone = True,
@ -377,7 +378,8 @@ grpc_cc_library(
select_deps = {
"grpc_no_xds": [],
"//conditions:default": [
"grpc++_xds_credentials",
"grpc++_xds_client",
"grpc++_xds_server",
],
},
standalone = True,
@ -393,16 +395,31 @@ grpc_cc_library(
)
grpc_cc_library(
name = "grpc++_xds_credentials",
name = "grpc++_xds_client",
srcs = [
"src/cpp/client/xds_credentials.cc",
"src/cpp/server/xds_server_credentials.cc",
],
hdrs = [
"src/cpp/client/secure_credentials.h",
],
language = "c++",
deps = [
"grpc++_base",
],
)
grpc_cc_library(
name = "grpc++_xds_server",
srcs = [
"src/cpp/server/xds_server_credentials.cc",
],
hdrs = [
"src/cpp/server/secure_server_credentials.h",
],
language = "c++",
public_hdrs = [
"include/grpcpp/xds_server_builder.h",
],
deps = [
"grpc++_base",
],
@ -1384,6 +1401,17 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "grpc_xds_server_config_fetcher",
srcs = [
"src/core/ext/xds/xds_server_config_fetcher.cc",
],
language = "c++",
deps = [
"grpc_xds_client",
],
)
grpc_cc_library(
name = "grpc_google_mesh_ca_certificate_provider_factory",
srcs = [

@ -739,6 +739,7 @@ config("grpc_config") {
"src/core/ext/xds/xds_client.h",
"src/core/ext/xds/xds_client_stats.cc",
"src/core/ext/xds/xds_client_stats.h",
"src/core/ext/xds/xds_server_config_fetcher.cc",
"src/core/lib/avl/avl.cc",
"src/core/lib/avl/avl.h",
"src/core/lib/backoff/backoff.cc",
@ -1427,6 +1428,7 @@ config("grpc_config") {
"include/grpcpp/support/sync_stream.h",
"include/grpcpp/support/time.h",
"include/grpcpp/support/validate_service_config.h",
"include/grpcpp/xds_server_builder.h",
"src/cpp/client/channel_cc.cc",
"src/cpp/client/client_callback.cc",
"src/cpp/client/client_context.cc",

@ -1721,6 +1721,7 @@ add_library(grpc
src/core/ext/xds/xds_certificate_provider.cc
src/core/ext/xds/xds_client.cc
src/core/ext/xds/xds_client_stats.cc
src/core/ext/xds/xds_server_config_fetcher.cc
src/core/lib/avl/avl.cc
src/core/lib/backoff/backoff.cc
src/core/lib/channel/channel_args.cc
@ -2953,6 +2954,7 @@ foreach(_hdr
include/grpcpp/support/sync_stream.h
include/grpcpp/support/time.h
include/grpcpp/support/validate_service_config.h
include/grpcpp/xds_server_builder.h
)
string(REPLACE "include/" "" _path ${_hdr})
get_filename_component(_path ${_path} PATH)

@ -1310,6 +1310,7 @@ LIBGRPC_SRC = \
src/core/ext/xds/xds_certificate_provider.cc \
src/core/ext/xds/xds_client.cc \
src/core/ext/xds/xds_client_stats.cc \
src/core/ext/xds/xds_server_config_fetcher.cc \
src/core/lib/avl/avl.cc \
src/core/lib/backoff/backoff.cc \
src/core/lib/channel/channel_args.cc \
@ -2819,6 +2820,7 @@ src/core/ext/xds/xds_bootstrap.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_certificate_provider.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_client.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_client_stats.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_server_config_fetcher.cc: $(OPENSSL_DEP)
src/core/lib/http/httpcli_security_connector.cc: $(OPENSSL_DEP)
src/core/lib/security/authorization/authorization_engine.cc: $(OPENSSL_DEP)
src/core/lib/security/authorization/evaluate_args.cc: $(OPENSSL_DEP)

@ -1141,6 +1141,7 @@ libs:
- src/core/ext/xds/xds_certificate_provider.cc
- src/core/ext/xds/xds_client.cc
- src/core/ext/xds/xds_client_stats.cc
- src/core/ext/xds/xds_server_config_fetcher.cc
- src/core/lib/avl/avl.cc
- src/core/lib/backoff/backoff.cc
- src/core/lib/channel/channel_args.cc
@ -2298,6 +2299,7 @@ libs:
- include/grpcpp/support/sync_stream.h
- include/grpcpp/support/time.h
- include/grpcpp/support/validate_service_config.h
- include/grpcpp/xds_server_builder.h
headers:
- src/cpp/client/create_channel_internal.h
- src/cpp/client/secure_credentials.h

@ -317,6 +317,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/xds/xds_certificate_provider.cc \
src/core/ext/xds/xds_client.cc \
src/core/ext/xds/xds_client_stats.cc \
src/core/ext/xds/xds_server_config_fetcher.cc \
src/core/lib/avl/avl.cc \
src/core/lib/backoff/backoff.cc \
src/core/lib/channel/channel_args.cc \

@ -284,6 +284,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\xds\\xds_certificate_provider.cc " +
"src\\core\\ext\\xds\\xds_client.cc " +
"src\\core\\ext\\xds\\xds_client_stats.cc " +
"src\\core\\ext\\xds\\xds_server_config_fetcher.cc " +
"src\\core\\lib\\avl\\avl.cc " +
"src\\core\\lib\\backoff\\backoff.cc " +
"src\\core\\lib\\channel\\channel_args.cc " +

@ -177,7 +177,8 @@ Pod::Spec.new do |s|
'include/grpcpp/support/stub_options.h',
'include/grpcpp/support/sync_stream.h',
'include/grpcpp/support/time.h',
'include/grpcpp/support/validate_service_config.h'
'include/grpcpp/support/validate_service_config.h',
'include/grpcpp/xds_server_builder.h'
end
s.subspec 'Implementation' do |ss|

@ -722,6 +722,7 @@ Pod::Spec.new do |s|
'src/core/ext/xds/xds_client.h',
'src/core/ext/xds/xds_client_stats.cc',
'src/core/ext/xds/xds_client_stats.h',
'src/core/ext/xds/xds_server_config_fetcher.cc',
'src/core/lib/avl/avl.cc',
'src/core/lib/avl/avl.h',
'src/core/lib/backoff/backoff.cc',

@ -57,6 +57,9 @@ EXPORTS
grpc_server_request_registered_call
grpc_server_create
grpc_server_register_completion_queue
grpc_server_config_fetcher_xds_create
grpc_server_config_fetcher_destroy
grpc_server_set_config_fetcher
grpc_server_add_insecure_http2_port
grpc_server_start
grpc_server_shutdown_and_notify

@ -637,6 +637,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/xds/xds_client.h )
s.files += %w( src/core/ext/xds/xds_client_stats.cc )
s.files += %w( src/core/ext/xds/xds_client_stats.h )
s.files += %w( src/core/ext/xds/xds_server_config_fetcher.cc )
s.files += %w( src/core/lib/avl/avl.cc )
s.files += %w( src/core/lib/avl/avl.h )
s.files += %w( src/core/lib/backoff/backoff.cc )

@ -729,6 +729,7 @@
'src/core/ext/xds/xds_certificate_provider.cc',
'src/core/ext/xds/xds_client.cc',
'src/core/ext/xds/xds_client_stats.cc',
'src/core/ext/xds/xds_server_config_fetcher.cc',
'src/core/lib/avl/avl.cc',
'src/core/lib/backoff/backoff.cc',
'src/core/lib/channel/channel_args.cc',

@ -411,6 +411,20 @@ GRPCAPI void grpc_server_register_completion_queue(grpc_server* server,
grpc_completion_queue* cq,
void* reserved);
typedef struct grpc_server_config_fetcher grpc_server_config_fetcher;
/** EXPERIMENTAL. Creates an xDS config fetcher. */
GRPCAPI grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create();
/** EXPERIMENTAL. Destroys a config fetcher. */
GRPCAPI void grpc_server_config_fetcher_destroy(
grpc_server_config_fetcher* config_fetcher);
/** EXPERIMENTAL. Sets the server's config fetcher. Takes ownership.
Must be called before adding ports */
GRPCAPI void grpc_server_set_config_fetcher(
grpc_server* server, grpc_server_config_fetcher* config_fetcher);
/** Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.
REQUIRES: server not started */

@ -179,6 +179,7 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<internal::ExternalConnectionAcceptorImpl>>
acceptors,
grpc_server_config_fetcher* server_config_fetcher = nullptr,
grpc_resource_quota* server_rq = nullptr,
std::vector<
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>

@ -347,6 +347,11 @@ class ServerBuilder {
return option_refs;
}
/// Experimental API, subject to change.
void set_fetcher(grpc_server_config_fetcher* server_config_fetcher) {
server_config_fetcher_ = server_config_fetcher;
}
private:
friend class ::grpc::testing::ServerBuilderPluginTest;
@ -405,6 +410,7 @@ class ServerBuilder {
interceptor_creators_;
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors_;
grpc_server_config_fetcher* server_config_fetcher_ = nullptr;
};
} // namespace grpc

@ -0,0 +1,43 @@
//
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPCPP_XDS_SERVER_BUILDER_H
#define GRPCPP_XDS_SERVER_BUILDER_H
#include <grpc/impl/codegen/port_platform.h>
#include <grpcpp/server_builder.h>
namespace grpc {
namespace experimental {
class XdsServerBuilder : public ::grpc::ServerBuilder {
public:
std::unique_ptr<Server> BuildAndStart() override {
grpc_server_config_fetcher* fetcher =
grpc_server_config_fetcher_xds_create();
if (fetcher == nullptr) return nullptr;
set_fetcher(fetcher);
return ServerBuilder::BuildAndStart();
}
};
} // namespace experimental
} // namespace grpc
#endif /* GRPCPP_XDS_SERVER_BUILDER_H */

@ -617,6 +617,7 @@
<file baseinstalldir="/" name="src/core/ext/xds/xds_client.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_client_stats.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_client_stats.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_server_config_fetcher.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/avl/avl.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/avl/avl.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/backoff/backoff.cc" role="src" />

@ -46,6 +46,7 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/slice/slice_internal.h"
@ -60,7 +61,7 @@ const char kUnixAbstractUriPrefix[] = "unix-abstract:";
class Chttp2ServerListener : public Server::ListenerInterface {
public:
static grpc_error* Create(Server* server, const char* addr,
static grpc_error* Create(Server* server, grpc_resolved_address* addr,
grpc_channel_args* args, int* port_num);
static grpc_error* CreateWithAcceptor(Server* server, const char* name,
@ -82,6 +83,38 @@ class Chttp2ServerListener : public Server::ListenerInterface {
void Orphan() override;
private:
class ConfigFetcherWatcher
: public grpc_server_config_fetcher::WatcherInterface {
public:
explicit ConfigFetcherWatcher(Chttp2ServerListener* listener)
: listener_(listener) {}
void UpdateConfig(grpc_channel_args* args) override {
{
MutexLock lock(&listener_->mu_);
// TODO(yashykt): Fix this
// grpc_channel_args_destroy(listener_->args_);
// listener_->args_ = args;
if (!listener_->shutdown_) return; // Already started listening.
}
int port_temp;
grpc_error* error = grpc_tcp_server_add_port(
listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
if (error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(error);
gpr_log(GPR_ERROR, "Error adding port to server: %s",
grpc_error_string(error));
// TODO(yashykt): We wouldn't need to assert here if we bound to the
// port earlier during AddPort.
GPR_ASSERT(0);
}
listener_->StartListening();
}
private:
Chttp2ServerListener* listener_;
};
class ConnectionState : public RefCounted<ConnectionState> {
public:
ConnectionState(Chttp2ServerListener* listener,
@ -110,6 +143,8 @@ class Chttp2ServerListener : public Server::ListenerInterface {
grpc_pollset_set* const interested_parties_;
};
void StartListening();
static void OnAccept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor);
@ -124,7 +159,9 @@ class Chttp2ServerListener : public Server::ListenerInterface {
Server* const server_;
grpc_channel_args* const args_;
grpc_tcp_server* tcp_server_;
grpc_resolved_address resolved_address_;
Mutex mu_;
ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
bool shutdown_ = true;
grpc_closure tcp_server_shutdown_complete_;
grpc_closure* on_destroy_done_ = nullptr;
@ -288,81 +325,44 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
// Chttp2ServerListener
//
grpc_error* Chttp2ServerListener::Create(Server* server, const char* addr,
grpc_error* Chttp2ServerListener::Create(Server* server,
grpc_resolved_address* addr,
grpc_channel_args* args,
int* port_num) {
std::vector<grpc_error*> error_list;
grpc_resolved_addresses* resolved = nullptr;
Chttp2ServerListener* listener = nullptr;
// The bulk of this method is inside of a lambda to make cleanup
// easier without using goto.
grpc_error* error = [&]() {
*port_num = -1;
/* resolve address */
grpc_error* error = GRPC_ERROR_NONE;
if (absl::StartsWith(addr, kUnixUriPrefix)) {
error = grpc_resolve_unix_domain_address(
addr + sizeof(kUnixUriPrefix) - 1, &resolved);
} else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
error = grpc_resolve_unix_abstract_domain_address(
addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
} else {
error = grpc_blocking_resolve_address(addr, "https", &resolved);
}
if (error != GRPC_ERROR_NONE) return error;
// Create Chttp2ServerListener.
listener = new Chttp2ServerListener(server, args);
error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
args, &listener->tcp_server_);
if (error != GRPC_ERROR_NONE) return error;
for (size_t i = 0; i < resolved->naddrs; i++) {
int port_temp;
error = grpc_tcp_server_add_port(listener->tcp_server_,
&resolved->addrs[i], &port_temp);
if (error != GRPC_ERROR_NONE) {
error_list.push_back(error);
} else {
if (*port_num == -1) {
*port_num = port_temp;
if (server->config_fetcher() != nullptr) {
listener->resolved_address_ = *addr;
// TODO(yashykt): Consider binding so as to be able to return the port
// number.
} else {
GPR_ASSERT(*port_num == port_temp);
}
}
}
if (error_list.size() == resolved->naddrs) {
std::string msg =
absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
resolved->naddrs);
return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
msg.c_str(), error_list.data(), error_list.size());
} else if (!error_list.empty()) {
std::string msg = absl::StrFormat(
"Only %" PRIuPTR " addresses added out of total %" PRIuPTR
" resolved",
resolved->naddrs - error_list.size(), resolved->naddrs);
error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
msg.c_str(), error_list.data(), error_list.size());
gpr_log(GPR_INFO, "WARNING: %s", grpc_error_string(error));
GRPC_ERROR_UNREF(error);
/* we managed to bind some addresses: continue */
error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
if (error != GRPC_ERROR_NONE) return error;
}
// Create channelz node.
if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
GRPC_ENABLE_CHANNELZ_DEFAULT)) {
std::string string_address = grpc_sockaddr_to_string(addr, false);
listener->channelz_listen_socket_ =
MakeRefCounted<channelz::ListenSocketNode>(
addr, absl::StrFormat("chttp2 listener %s", addr));
string_address.c_str(),
absl::StrFormat("chttp2 listener %s", string_address.c_str()));
}
/* Register with the server only upon success */
// Register with the server only upon success
server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
return GRPC_ERROR_NONE;
}();
if (resolved != nullptr) {
grpc_resolved_addresses_destroy(resolved);
}
if (error != GRPC_ERROR_NONE) {
if (listener != nullptr) {
if (listener->tcp_server_ != nullptr) {
// listener is deleted when tcp_server_ is shutdown.
grpc_tcp_server_unref(listener->tcp_server_);
} else {
delete listener;
@ -370,10 +370,6 @@ grpc_error* Chttp2ServerListener::Create(Server* server, const char* addr,
} else {
grpc_channel_args_destroy(args);
}
*port_num = 0;
}
for (grpc_error* error : error_list) {
GRPC_ERROR_UNREF(error);
}
return error;
}
@ -408,13 +404,25 @@ Chttp2ServerListener::~Chttp2ServerListener() {
}
/* Server callback: start listening on our ports */
void Chttp2ServerListener::Start(Server* /*server*/,
const std::vector<grpc_pollset*>* pollsets) {
void Chttp2ServerListener::Start(
Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
if (server_->config_fetcher() != nullptr) {
auto watcher = absl::make_unique<ConfigFetcherWatcher>(this);
{
MutexLock lock(&mu_);
shutdown_ = false;
config_fetcher_watcher_ = watcher.get();
}
server_->config_fetcher()->StartWatch(
grpc_sockaddr_to_string(&resolved_address_, false), std::move(watcher));
} else {
StartListening();
}
grpc_tcp_server_start(tcp_server_, pollsets, OnAccept, this);
}
void Chttp2ServerListener::StartListening() {
grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this);
MutexLock lock(&mu_);
shutdown_ = false;
}
void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
@ -483,6 +491,11 @@ void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
/* Server callback: destroy the tcp listener (so we don't generate further
callbacks) */
void Chttp2ServerListener::Orphan() {
// Cancel the watch before shutting down so as to avoid holding a ref to the
// listener in the watcher.
if (config_fetcher_watcher_ != nullptr) {
server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
}
grpc_tcp_server* tcp_server;
{
MutexLock lock(&mu_);
@ -505,7 +518,70 @@ grpc_error* Chttp2ServerAddPort(Server* server, const char* addr,
return grpc_core::Chttp2ServerListener::CreateWithAcceptor(server, addr,
args);
}
return grpc_core::Chttp2ServerListener::Create(server, addr, args, port_num);
*port_num = -1;
grpc_resolved_addresses* resolved = nullptr;
std::vector<grpc_error*> error_list;
// Using lambda to avoid use of goto.
grpc_error* error = [&]() {
if (absl::StartsWith(addr, kUnixUriPrefix)) {
error = grpc_resolve_unix_domain_address(
addr + sizeof(kUnixUriPrefix) - 1, &resolved);
} else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
error = grpc_resolve_unix_abstract_domain_address(
addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
} else {
error = grpc_blocking_resolve_address(addr, "https", &resolved);
}
if (error != GRPC_ERROR_NONE) return error;
// Create a listener for each resolved address.
for (size_t i = 0; i < resolved->naddrs; i++) {
// If address has a wildcard port (0), use the same port as a previous
// listener.
if (*port_num != -1 && grpc_sockaddr_get_port(&resolved->addrs[i]) == 0) {
grpc_sockaddr_set_port(&resolved->addrs[i], *port_num);
}
int port_temp;
error = grpc_core::Chttp2ServerListener::Create(
server, &resolved->addrs[i], grpc_channel_args_copy(args),
&port_temp);
if (error != GRPC_ERROR_NONE) {
error_list.push_back(error);
} else {
if (*port_num == -1) {
*port_num = port_temp;
} else {
GPR_ASSERT(*port_num == port_temp);
}
}
}
if (error_list.size() == resolved->naddrs) {
std::string msg =
absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
resolved->naddrs);
return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
msg.c_str(), error_list.data(), error_list.size());
} else if (!error_list.empty()) {
std::string msg = absl::StrFormat(
"Only %" PRIuPTR " addresses added out of total %" PRIuPTR
" resolved",
resolved->naddrs - error_list.size(), resolved->naddrs);
error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
msg.c_str(), error_list.data(), error_list.size());
gpr_log(GPR_INFO, "WARNING: %s", grpc_error_string(error));
GRPC_ERROR_UNREF(error);
// we managed to bind some addresses: continue without error
}
return GRPC_ERROR_NONE;
}(); // lambda end
for (grpc_error* error : error_list) {
GRPC_ERROR_UNREF(error);
}
grpc_channel_args_destroy(args);
if (resolved != nullptr) {
grpc_resolved_addresses_destroy(resolved);
}
if (error != GRPC_ERROR_NONE) *port_num = 0;
return error;
}
} // namespace grpc_core

@ -0,0 +1,131 @@
//
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/xds/xds_client.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
namespace grpc_core {
namespace {
class XdsServerConfigFetcher : public grpc_server_config_fetcher {
public:
explicit XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client)
: xds_client_(std::move(xds_client)) {
GPR_ASSERT(xds_client_ != nullptr);
}
void StartWatch(std::string listening_address,
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
watcher) override {
grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get();
auto listener_watcher =
absl::make_unique<ListenerWatcher>(std::move(watcher));
auto* listener_watcher_ptr = listener_watcher.get();
// TODO(yashykt): Get the resource name id from bootstrap
xds_client_->WatchListenerData(
absl::StrCat("grpc/server?xds.resource.listening_address=",
listening_address),
std::move(listener_watcher));
MutexLock lock(&mu_);
auto& watcher_state = watchers_[watcher_ptr];
watcher_state.listening_address = listening_address;
watcher_state.listener_watcher = listener_watcher_ptr;
}
void CancelWatch(
grpc_server_config_fetcher::WatcherInterface* watcher) override {
MutexLock lock(&mu_);
auto it = watchers_.find(watcher);
if (it != watchers_.end()) {
// Cancel the watch on the listener before erasing
xds_client_->CancelListenerDataWatch(it->second.listening_address,
it->second.listener_watcher,
false /* delay_unsubscription */);
watchers_.erase(it);
}
}
// Return the interested parties from the xds client so that it can be polled.
grpc_pollset_set* interested_parties() override {
return xds_client_->interested_parties();
}
private:
class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
public:
explicit ListenerWatcher(
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
server_config_watcher)
: server_config_watcher_(std::move(server_config_watcher)) {}
void OnListenerChanged(XdsApi::LdsUpdate listener) override {
// TODO(yashykt): Construct channel args according to received update
server_config_watcher_->UpdateConfig(nullptr);
}
void OnError(grpc_error* error) override {
gpr_log(GPR_ERROR, "ListenerWatcher:%p XdsClient reports error: %s", this,
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
// TODO(yashykt): We might want to bubble this error to the application.
}
void OnResourceDoesNotExist() override {
gpr_log(GPR_ERROR,
"ListenerWatcher:%p XdsClient reports requested listener does "
"not exist",
this);
// TODO(yashykt): We might want to bubble this error to the application.
}
private:
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
server_config_watcher_;
};
struct WatcherState {
std::string listening_address;
ListenerWatcher* listener_watcher = nullptr;
};
RefCountedPtr<XdsClient> xds_client_;
Mutex mu_;
std::map<grpc_server_config_fetcher::WatcherInterface*, WatcherState>
watchers_;
};
} // namespace
} // namespace grpc_core
grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ());
grpc_error* error = GRPC_ERROR_NONE;
grpc_core::RefCountedPtr<grpc_core::XdsClient> xds_client =
grpc_core::XdsClient::GetOrCreate(&error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Failed to create xds client: %s",
grpc_error_string(error));
return nullptr;
}
return new grpc_core::XdsServerConfigFetcher(std::move(xds_client));
}

@ -538,6 +538,14 @@ Server::Server(const grpc_channel_args* args)
Server::~Server() {
grpc_channel_args_destroy(channel_args_);
// Remove the cq pollsets from the config_fetcher.
if (started_ && config_fetcher_ != nullptr &&
config_fetcher_->interested_parties() != nullptr) {
for (grpc_pollset* pollset : pollsets_) {
grpc_pollset_set_del_pollset(config_fetcher_->interested_parties(),
pollset);
}
}
for (size_t i = 0; i < cqs_.size(); i++) {
GRPC_CQ_INTERNAL_UNREF(cqs_[i], "server");
}
@ -571,6 +579,16 @@ void Server::Start() {
MutexLock lock(&mu_global_);
starting_ = true;
}
// Register the interested parties from the config fetcher to the cq pollsets
// before starting listeners so that config fetcher is being polled when the
// listeners start watch the fetcher.
if (config_fetcher_ != nullptr &&
config_fetcher_->interested_parties() != nullptr) {
for (grpc_pollset* pollset : pollsets_) {
grpc_pollset_set_add_pollset(config_fetcher_->interested_parties(),
pollset);
}
}
for (auto& listener : listeners_) {
listener.listener->Start(this, &pollsets_);
}
@ -1562,3 +1580,22 @@ grpc_call_error grpc_server_request_registered_call(
rm, call, deadline, request_metadata, optional_payload, cq_bound_to_call,
cq_for_notification, tag_new);
}
void grpc_server_set_config_fetcher(
grpc_server* server, grpc_server_config_fetcher* server_config_fetcher) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_set_config_fetcher(server=%p, config_fetcher=%p)",
2, (server, server_config_fetcher));
server->core_server->set_config_fetcher(
std::unique_ptr<grpc_server_config_fetcher>(server_config_fetcher));
}
void grpc_server_config_fetcher_destroy(
grpc_server_config_fetcher* server_config_fetcher) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_config_fetcher_destroy(config_fetcher=%p)", 1,
(server_config_fetcher));
delete server_config_fetcher;
}

@ -103,6 +103,15 @@ class Server : public InternallyRefCounted<Server> {
// result is valid for the lifetime of the server.
const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; }
grpc_server_config_fetcher* config_fetcher() const {
return config_fetcher_.get();
}
void set_config_fetcher(
std::unique_ptr<grpc_server_config_fetcher> config_fetcher) {
config_fetcher_ = std::move(config_fetcher);
}
bool HasOpenConnections();
// Adds a listener to the server. When the server starts, it will call
@ -350,6 +359,7 @@ class Server : public InternallyRefCounted<Server> {
grpc_channel_args* const channel_args_;
grpc_resource_user* default_resource_user_ = nullptr;
RefCountedPtr<channelz::ServerNode> channelz_node_;
std::unique_ptr<grpc_server_config_fetcher> config_fetcher_;
std::vector<grpc_completion_queue*> cqs_;
std::vector<grpc_pollset*> pollsets_;
@ -394,4 +404,26 @@ struct grpc_server {
grpc_core::OrphanablePtr<grpc_core::Server> core_server;
};
// TODO(roth): Eventually, will need a way to modify configuration even after
// a connection is established (e.g., to change things like L7 rate
// limiting, RBAC, and fault injection configs). One possible option
// would be to do something like ServiceConfig and ConfigSelector, but
// that might add unnecessary per-call overhead. Need to consider other
// approaches here.
struct grpc_server_config_fetcher {
public:
class WatcherInterface {
public:
virtual ~WatcherInterface() = default;
virtual void UpdateConfig(grpc_channel_args* args) = 0;
};
virtual ~grpc_server_config_fetcher() = default;
virtual void StartWatch(std::string listening_address,
std::unique_ptr<WatcherInterface> watcher) = 0;
virtual void CancelWatch(WatcherInterface* watcher) = 0;
virtual grpc_pollset_set* interested_parties() = 0;
};
#endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */

@ -331,7 +331,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<grpc::Server> server(new grpc::Server(
&args, sync_server_cqs, sync_server_settings_.min_pollers,
sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec,
std::move(acceptors_), resource_quota_,
std::move(acceptors_), server_config_fetcher_, resource_quota_,
std::move(interceptor_creators_)));
ServerInitializer* initializer = server->initializer();

@ -877,6 +877,7 @@ Server::Server(
int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors,
grpc_server_config_fetcher* server_config_fetcher,
grpc_resource_quota* server_rq,
std::vector<
std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
@ -940,6 +941,7 @@ Server::Server(
}
}
server_ = grpc_server_create(&channel_args, nullptr);
grpc_server_set_config_fetcher(server_, server_config_fetcher);
}
Server::~Server() {

@ -293,6 +293,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/xds/xds_certificate_provider.cc',
'src/core/ext/xds/xds_client.cc',
'src/core/ext/xds/xds_client_stats.cc',
'src/core/ext/xds/xds_server_config_fetcher.cc',
'src/core/lib/avl/avl.cc',
'src/core/lib/backoff/backoff.cc',
'src/core/lib/channel/channel_args.cc',

@ -80,6 +80,9 @@ grpc_server_register_method_type grpc_server_register_method_import;
grpc_server_request_registered_call_type grpc_server_request_registered_call_import;
grpc_server_create_type grpc_server_create_import;
grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
grpc_server_config_fetcher_xds_create_type grpc_server_config_fetcher_xds_create_import;
grpc_server_config_fetcher_destroy_type grpc_server_config_fetcher_destroy_import;
grpc_server_set_config_fetcher_type grpc_server_set_config_fetcher_import;
grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
grpc_server_start_type grpc_server_start_import;
grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import;
@ -360,6 +363,9 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call");
grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create");
grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue");
grpc_server_config_fetcher_xds_create_import = (grpc_server_config_fetcher_xds_create_type) GetProcAddress(library, "grpc_server_config_fetcher_xds_create");
grpc_server_config_fetcher_destroy_import = (grpc_server_config_fetcher_destroy_type) GetProcAddress(library, "grpc_server_config_fetcher_destroy");
grpc_server_set_config_fetcher_import = (grpc_server_set_config_fetcher_type) GetProcAddress(library, "grpc_server_set_config_fetcher");
grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port");
grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start");
grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify");

@ -215,6 +215,15 @@ extern grpc_server_create_type grpc_server_create_import;
typedef void(*grpc_server_register_completion_queue_type)(grpc_server* server, grpc_completion_queue* cq, void* reserved);
extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
#define grpc_server_register_completion_queue grpc_server_register_completion_queue_import
typedef grpc_server_config_fetcher*(*grpc_server_config_fetcher_xds_create_type)();
extern grpc_server_config_fetcher_xds_create_type grpc_server_config_fetcher_xds_create_import;
#define grpc_server_config_fetcher_xds_create grpc_server_config_fetcher_xds_create_import
typedef void(*grpc_server_config_fetcher_destroy_type)(grpc_server_config_fetcher* config_fetcher);
extern grpc_server_config_fetcher_destroy_type grpc_server_config_fetcher_destroy_import;
#define grpc_server_config_fetcher_destroy grpc_server_config_fetcher_destroy_import
typedef void(*grpc_server_set_config_fetcher_type)(grpc_server* server, grpc_server_config_fetcher* config_fetcher);
extern grpc_server_set_config_fetcher_type grpc_server_set_config_fetcher_import;
#define grpc_server_set_config_fetcher grpc_server_set_config_fetcher_import
typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server* server, const char* addr);
extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
#define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import

@ -126,6 +126,9 @@ int main(int argc, char **argv) {
printf("%lx", (unsigned long) grpc_server_request_registered_call);
printf("%lx", (unsigned long) grpc_server_create);
printf("%lx", (unsigned long) grpc_server_register_completion_queue);
printf("%lx", (unsigned long) grpc_server_config_fetcher_xds_create);
printf("%lx", (unsigned long) grpc_server_config_fetcher_destroy);
printf("%lx", (unsigned long) grpc_server_set_config_fetcher);
printf("%lx", (unsigned long) grpc_server_add_insecure_http2_port);
printf("%lx", (unsigned long) grpc_server_start);
printf("%lx", (unsigned long) grpc_server_shutdown_and_notify);

@ -746,15 +746,25 @@ TEST_F(ChannelzServerTest, GetServerListenSocketsTest) {
&get_server_response);
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
EXPECT_EQ(get_server_response.server_size(), 1);
EXPECT_EQ(get_server_response.server(0).listen_socket_size(), 1);
// The server address gets resolved to two addresses, one for ipv4 and one for
// ipv6, and hence two sockets.
EXPECT_EQ(get_server_response.server(0).listen_socket_size(), 2);
GetSocketRequest get_socket_request;
GetSocketResponse get_socket_response;
get_socket_request.set_socket_id(
get_server_response.server(0).listen_socket(0).socket_id());
EXPECT_TRUE(
get_server_response.server(0).listen_socket(0).name().find("http"));
ClientContext get_socket_context;
s = channelz_stub_->GetSocket(&get_socket_context, get_socket_request,
ClientContext get_socket_context_1;
s = channelz_stub_->GetSocket(&get_socket_context_1, get_socket_request,
&get_socket_response);
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
get_socket_request.set_socket_id(
get_server_response.server(0).listen_socket(1).socket_id());
ClientContext get_socket_context_2;
EXPECT_TRUE(
get_server_response.server(0).listen_socket(1).name().find("http"));
s = channelz_stub_->GetSocket(&get_socket_context_2, get_socket_request,
&get_socket_response);
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
}

@ -1037,7 +1037,8 @@ include/grpcpp/support/string_ref.h \
include/grpcpp/support/stub_options.h \
include/grpcpp/support/sync_stream.h \
include/grpcpp/support/time.h \
include/grpcpp/support/validate_service_config.h
include/grpcpp/support/validate_service_config.h \
include/grpcpp/xds_server_builder.h
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses

@ -1038,6 +1038,7 @@ include/grpcpp/support/stub_options.h \
include/grpcpp/support/sync_stream.h \
include/grpcpp/support/time.h \
include/grpcpp/support/validate_service_config.h \
include/grpcpp/xds_server_builder.h \
src/core/ext/filters/census/grpc_context.cc \
src/core/ext/filters/client_channel/backend_metric.cc \
src/core/ext/filters/client_channel/backend_metric.h \
@ -1569,6 +1570,7 @@ src/core/ext/xds/xds_client.cc \
src/core/ext/xds/xds_client.h \
src/core/ext/xds/xds_client_stats.cc \
src/core/ext/xds/xds_client_stats.h \
src/core/ext/xds/xds_server_config_fetcher.cc \
src/core/lib/avl/avl.cc \
src/core/lib/avl/avl.h \
src/core/lib/backoff/backoff.cc \

@ -1406,6 +1406,7 @@ src/core/ext/xds/xds_client.cc \
src/core/ext/xds/xds_client.h \
src/core/ext/xds/xds_client_stats.cc \
src/core/ext/xds/xds_client_stats.h \
src/core/ext/xds/xds_server_config_fetcher.cc \
src/core/lib/README.md \
src/core/lib/avl/avl.cc \
src/core/lib/avl/avl.h \

Loading…
Cancel
Save