Merge pull request #24607 from markdroth/xds_api_stateless

xds: Move use_v3 out of XdsApi, so that we can specify it on a per-server basis.
pull/24637/head
Mark D. Roth 4 years ago committed by GitHub
commit 04045e2273
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      src/core/ext/xds/xds_api.cc
  2. 10
      src/core/ext/xds/xds_api.h
  3. 71
      src/core/ext/xds/xds_client.cc
  4. 4
      src/core/ext/xds/xds_client.h

@ -552,11 +552,10 @@ bool IsEds(absl::string_view type_url) {
} // namespace } // namespace
XdsApi::XdsApi(XdsClient* client, TraceFlag* tracer, XdsApi::XdsApi(XdsClient* client, TraceFlag* tracer,
const XdsBootstrap* bootstrap) const XdsBootstrap::Node* node)
: client_(client), : client_(client),
tracer_(tracer), tracer_(tracer),
use_v3_(bootstrap != nullptr && bootstrap->server().ShouldUseV3()), node_(node),
bootstrap_(bootstrap),
build_version_(absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING, " ", build_version_(absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING, " ",
grpc_version_string())), grpc_version_string())),
user_agent_name_(absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING)) {} user_agent_name_(absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING)) {}
@ -657,11 +656,10 @@ void PopulateBuildVersion(upb_arena* arena, envoy_config_core_v3_Node* node_msg,
encoded_build_version.size(), arena); encoded_build_version.size(), arena);
} }
void PopulateNode(upb_arena* arena, const XdsBootstrap* bootstrap, void PopulateNode(upb_arena* arena, const XdsBootstrap::Node* node, bool use_v3,
const std::string& build_version, const std::string& build_version,
const std::string& user_agent_name, const std::string& user_agent_name,
envoy_config_core_v3_Node* node_msg) { envoy_config_core_v3_Node* node_msg) {
const XdsBootstrap::Node* node = bootstrap->node();
if (node != nullptr) { if (node != nullptr) {
if (!node->id.empty()) { if (!node->id.empty()) {
envoy_config_core_v3_Node_set_id(node_msg, envoy_config_core_v3_Node_set_id(node_msg,
@ -694,7 +692,7 @@ void PopulateNode(upb_arena* arena, const XdsBootstrap* bootstrap,
} }
} }
} }
if (!bootstrap->server().ShouldUseV3()) { if (!use_v3) {
PopulateBuildVersion(arena, node_msg, build_version); PopulateBuildVersion(arena, node_msg, build_version);
} }
envoy_config_core_v3_Node_set_user_agent_name( envoy_config_core_v3_Node_set_user_agent_name(
@ -758,7 +756,7 @@ absl::string_view TypeUrlExternalToInternal(bool use_v3,
} // namespace } // namespace
grpc_slice XdsApi::CreateAdsRequest( grpc_slice XdsApi::CreateAdsRequest(
const std::string& type_url, const XdsBootstrap::XdsServer& server, const std::string& type_url,
const std::set<absl::string_view>& resource_names, const std::set<absl::string_view>& resource_names,
const std::string& version, const std::string& nonce, grpc_error* error, const std::string& version, const std::string& nonce, grpc_error* error,
bool populate_node) { bool populate_node) {
@ -768,7 +766,7 @@ grpc_slice XdsApi::CreateAdsRequest(
envoy_service_discovery_v3_DiscoveryRequest_new(arena.ptr()); envoy_service_discovery_v3_DiscoveryRequest_new(arena.ptr());
// Set type_url. // Set type_url.
absl::string_view real_type_url = absl::string_view real_type_url =
TypeUrlExternalToInternal(use_v3_, type_url); TypeUrlExternalToInternal(server.ShouldUseV3(), type_url);
envoy_service_discovery_v3_DiscoveryRequest_set_type_url( envoy_service_discovery_v3_DiscoveryRequest_set_type_url(
request, StdStringToUpbString(real_type_url)); request, StdStringToUpbString(real_type_url));
// Set version_info. // Set version_info.
@ -805,8 +803,8 @@ grpc_slice XdsApi::CreateAdsRequest(
envoy_config_core_v3_Node* node_msg = envoy_config_core_v3_Node* node_msg =
envoy_service_discovery_v3_DiscoveryRequest_mutable_node(request, envoy_service_discovery_v3_DiscoveryRequest_mutable_node(request,
arena.ptr()); arena.ptr());
PopulateNode(arena.ptr(), bootstrap_, build_version_, user_agent_name_, PopulateNode(arena.ptr(), node_, server.ShouldUseV3(), build_version_,
node_msg); user_agent_name_, node_msg);
} }
// Add resource_names. // Add resource_names.
for (const auto& resource_name : resource_names) { for (const auto& resource_name : resource_names) {
@ -1907,7 +1905,8 @@ grpc_slice SerializeLrsRequest(
} // namespace } // namespace
grpc_slice XdsApi::CreateLrsInitialRequest() { grpc_slice XdsApi::CreateLrsInitialRequest(
const XdsBootstrap::XdsServer& server) {
upb::Arena arena; upb::Arena arena;
// Create a request. // Create a request.
envoy_service_load_stats_v3_LoadStatsRequest* request = envoy_service_load_stats_v3_LoadStatsRequest* request =
@ -1916,8 +1915,8 @@ grpc_slice XdsApi::CreateLrsInitialRequest() {
envoy_config_core_v3_Node* node_msg = envoy_config_core_v3_Node* node_msg =
envoy_service_load_stats_v3_LoadStatsRequest_mutable_node(request, envoy_service_load_stats_v3_LoadStatsRequest_mutable_node(request,
arena.ptr()); arena.ptr());
PopulateNode(arena.ptr(), bootstrap_, build_version_, user_agent_name_, PopulateNode(arena.ptr(), node_, server.ShouldUseV3(), build_version_,
node_msg); user_agent_name_, node_msg);
envoy_config_core_v3_Node_add_client_features( envoy_config_core_v3_Node_add_client_features(
node_msg, upb_strview_makez("envoy.lrs.supports_send_all_clusters"), node_msg, upb_strview_makez("envoy.lrs.supports_send_all_clusters"),
arena.ptr()); arena.ptr());

@ -363,11 +363,12 @@ class XdsApi {
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
ClusterLoadReport>; ClusterLoadReport>;
XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap* bootstrap); XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap::Node* node);
// Creates an ADS request. // Creates an ADS request.
// Takes ownership of \a error. // Takes ownership of \a error.
grpc_slice CreateAdsRequest(const std::string& type_url, grpc_slice CreateAdsRequest(const XdsBootstrap::XdsServer& server,
const std::string& type_url,
const std::set<absl::string_view>& resource_names, const std::set<absl::string_view>& resource_names,
const std::string& version, const std::string& version,
const std::string& nonce, grpc_error* error, const std::string& nonce, grpc_error* error,
@ -394,7 +395,7 @@ class XdsApi {
const std::set<absl::string_view>& expected_eds_service_names); const std::set<absl::string_view>& expected_eds_service_names);
// Creates an initial LRS request. // Creates an initial LRS request.
grpc_slice CreateLrsInitialRequest(); grpc_slice CreateLrsInitialRequest(const XdsBootstrap::XdsServer& server);
// Creates an LRS request sending a client-side load report. // Creates an LRS request sending a client-side load report.
grpc_slice CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map); grpc_slice CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map);
@ -410,8 +411,7 @@ class XdsApi {
private: private:
XdsClient* client_; XdsClient* client_;
TraceFlag* tracer_; TraceFlag* tracer_;
const bool use_v3_; const XdsBootstrap::Node* node_; // Do not own.
const XdsBootstrap* bootstrap_; // Do not own.
upb::SymbolTable symtab_; upb::SymbolTable symtab_;
const std::string build_version_; const std::string build_version_;
const std::string user_agent_name_; const std::string user_agent_name_;

@ -431,13 +431,40 @@ class XdsClient::ChannelState::StateWatcher
// XdsClient::ChannelState // XdsClient::ChannelState
// //
namespace {
grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) {
// Build channel args.
absl::InlinedVector<grpc_arg, 2> args_to_add = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
5 * 60 * GPR_MS_PER_SEC),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
};
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
g_channel_args, args_to_add.data(), args_to_add.size());
// Create channel.
grpc_channel* channel = grpc_secure_channel_create(
server.channel_creds.get(), server.server_uri.c_str(), new_args, nullptr);
grpc_channel_args_destroy(new_args);
return channel;
}
} // namespace
XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client, XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
grpc_channel* channel) const XdsBootstrap::XdsServer& server)
: InternallyRefCounted<ChannelState>( : InternallyRefCounted<ChannelState>(
GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "ChannelState" GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "ChannelState"
: nullptr), : nullptr),
xds_client_(std::move(xds_client)), xds_client_(std::move(xds_client)),
channel_(channel) { server_(server) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
xds_client_.get(), server.server_uri.c_str());
}
channel_ = CreateXdsChannel(server);
GPR_ASSERT(channel_ != nullptr); GPR_ASSERT(channel_ != nullptr);
StartConnectivityWatchLocked(); StartConnectivityWatchLocked();
} }
@ -646,7 +673,7 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
GPR_ASSERT(xds_client() != nullptr); GPR_ASSERT(xds_client() != nullptr);
// Create a call with the specified method name. // Create a call with the specified method name.
const auto& method = const auto& method =
xds_client()->bootstrap_->server().ShouldUseV3() chand()->server_.ShouldUseV3()
? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
: GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES; : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
call_ = grpc_channel_create_pollset_set_call( call_ = grpc_channel_create_pollset_set_call(
@ -767,8 +794,9 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
std::set<absl::string_view> resource_names = std::set<absl::string_view> resource_names =
ResourceNamesForRequest(type_url); ResourceNamesForRequest(type_url);
request_payload_slice = xds_client()->api_.CreateAdsRequest( request_payload_slice = xds_client()->api_.CreateAdsRequest(
type_url, resource_names, xds_client()->resource_version_map_[type_url], chand()->server_, type_url, resource_names,
state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_); xds_client()->resource_version_map_[type_url], state.nonce,
GRPC_ERROR_REF(state.error), !sent_initial_message_);
if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl && if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) { type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
state_map_.erase(type_url); state_map_.erase(type_url);
@ -1401,7 +1429,7 @@ XdsClient::ChannelState::LrsCallState::LrsCallState(
// the polling entities from client_channel. // the polling entities from client_channel.
GPR_ASSERT(xds_client() != nullptr); GPR_ASSERT(xds_client() != nullptr);
const auto& method = const auto& method =
xds_client()->bootstrap_->server().ShouldUseV3() chand()->server_.ShouldUseV3()
? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
: GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS; : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
call_ = grpc_channel_create_pollset_set_call( call_ = grpc_channel_create_pollset_set_call(
@ -1411,7 +1439,7 @@ XdsClient::ChannelState::LrsCallState::LrsCallState(
GPR_ASSERT(call_ != nullptr); GPR_ASSERT(call_ != nullptr);
// Init the request payload. // Init the request payload.
grpc_slice request_payload_slice = grpc_slice request_payload_slice =
xds_client()->api_.CreateLrsInitialRequest(); xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
send_message_payload_ = send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice); grpc_slice_unref_internal(request_payload_slice);
@ -1702,25 +1730,6 @@ grpc_millis GetRequestTimeout() {
{15000, 0, INT_MAX}); {15000, 0, INT_MAX});
} }
grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap) {
// Build channel args.
absl::InlinedVector<grpc_arg, 2> args_to_add = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
5 * 60 * GPR_MS_PER_SEC),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
};
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
g_channel_args, args_to_add.data(), args_to_add.size());
// Create channel.
grpc_channel* channel = grpc_secure_channel_create(
bootstrap.server().channel_creds.get(),
bootstrap.server().server_uri.c_str(), new_args, nullptr);
grpc_channel_args_destroy(new_args);
return channel;
}
} // namespace } // namespace
XdsClient::XdsClient(grpc_error** error) XdsClient::XdsClient(grpc_error** error)
@ -1731,7 +1740,8 @@ XdsClient::XdsClient(grpc_error** error)
interested_parties_(grpc_pollset_set_create()), interested_parties_(grpc_pollset_set_create()),
bootstrap_( bootstrap_(
XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)), XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
api_(this, &grpc_xds_client_trace, bootstrap_.get()) { api_(this, &grpc_xds_client_trace,
bootstrap_ == nullptr ? nullptr : bootstrap_->node()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
} }
@ -1740,14 +1750,9 @@ XdsClient::XdsClient(grpc_error** error)
this, grpc_error_string(*error)); this, grpc_error_string(*error));
return; return;
} }
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", this,
bootstrap_->server().server_uri.c_str());
}
grpc_channel* channel = CreateXdsChannel(*bootstrap_);
// Create ChannelState object. // Create ChannelState object.
chand_ = MakeOrphanable<ChannelState>( chand_ = MakeOrphanable<ChannelState>(
WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), channel); WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
} }
XdsClient::~XdsClient() { XdsClient::~XdsClient() {

@ -198,7 +198,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
class LrsCallState; class LrsCallState;
ChannelState(WeakRefCountedPtr<XdsClient> xds_client, ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
grpc_channel* channel); const XdsBootstrap::XdsServer& server);
~ChannelState() override; ~ChannelState() override;
void Orphan() override; void Orphan() override;
@ -226,6 +226,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
// The owning xds client. // The owning xds client.
WeakRefCountedPtr<XdsClient> xds_client_; WeakRefCountedPtr<XdsClient> xds_client_;
const XdsBootstrap::XdsServer& server_;
// The channel and its status. // The channel and its status.
grpc_channel* channel_; grpc_channel* channel_;
bool shutting_down_ = false; bool shutting_down_ = false;

Loading…
Cancel
Save