From 7ddb18af574df94bfc2e36c10c7bf43eb67823ed Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 30 Oct 2020 07:29:57 -0700 Subject: [PATCH] xds: Move use_v3 out of XdsApi, so that we can specify it on a per-server basis. --- src/core/ext/xds/xds_api.cc | 25 ++++++------ src/core/ext/xds/xds_api.h | 10 ++--- src/core/ext/xds/xds_client.cc | 71 ++++++++++++++++++---------------- src/core/ext/xds/xds_client.h | 4 +- 4 files changed, 58 insertions(+), 52 deletions(-) diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index 36a98bd4083..31826767bef 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -552,11 +552,10 @@ bool IsEds(absl::string_view type_url) { } // namespace XdsApi::XdsApi(XdsClient* client, TraceFlag* tracer, - const XdsBootstrap* bootstrap) + const XdsBootstrap::Node* node) : client_(client), tracer_(tracer), - use_v3_(bootstrap != nullptr && bootstrap->server().ShouldUseV3()), - bootstrap_(bootstrap), + node_(node), build_version_(absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING, " ", grpc_version_string())), user_agent_name_(absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING)) {} @@ -657,11 +656,10 @@ void PopulateBuildVersion(upb_arena* arena, envoy_config_core_v3_Node* node_msg, encoded_build_version.size(), arena); } -void PopulateNode(upb_arena* arena, const XdsBootstrap* bootstrap, +void PopulateNode(upb_arena* arena, const XdsBootstrap::Node* node, bool use_v3, const std::string& build_version, const std::string& user_agent_name, envoy_config_core_v3_Node* node_msg) { - const XdsBootstrap::Node* node = bootstrap->node(); if (node != nullptr) { if (!node->id.empty()) { envoy_config_core_v3_Node_set_id(node_msg, @@ -694,7 +692,7 @@ void PopulateNode(upb_arena* arena, const XdsBootstrap* bootstrap, } } } - if (!bootstrap->server().ShouldUseV3()) { + if (!use_v3) { PopulateBuildVersion(arena, node_msg, build_version); } envoy_config_core_v3_Node_set_user_agent_name( @@ -758,7 +756,7 @@ absl::string_view TypeUrlExternalToInternal(bool use_v3, } // namespace grpc_slice XdsApi::CreateAdsRequest( - const std::string& type_url, + const XdsBootstrap::XdsServer& server, const std::string& type_url, const std::set& resource_names, const std::string& version, const std::string& nonce, grpc_error* error, bool populate_node) { @@ -768,7 +766,7 @@ grpc_slice XdsApi::CreateAdsRequest( envoy_service_discovery_v3_DiscoveryRequest_new(arena.ptr()); // Set type_url. absl::string_view real_type_url = - TypeUrlExternalToInternal(use_v3_, type_url); + TypeUrlExternalToInternal(server.ShouldUseV3(), type_url); envoy_service_discovery_v3_DiscoveryRequest_set_type_url( request, StdStringToUpbString(real_type_url)); // Set version_info. @@ -805,8 +803,8 @@ grpc_slice XdsApi::CreateAdsRequest( envoy_config_core_v3_Node* node_msg = envoy_service_discovery_v3_DiscoveryRequest_mutable_node(request, arena.ptr()); - PopulateNode(arena.ptr(), bootstrap_, build_version_, user_agent_name_, - node_msg); + PopulateNode(arena.ptr(), node_, server.ShouldUseV3(), build_version_, + user_agent_name_, node_msg); } // Add resource_names. for (const auto& resource_name : resource_names) { @@ -1907,7 +1905,8 @@ grpc_slice SerializeLrsRequest( } // namespace -grpc_slice XdsApi::CreateLrsInitialRequest() { +grpc_slice XdsApi::CreateLrsInitialRequest( + const XdsBootstrap::XdsServer& server) { upb::Arena arena; // Create a request. envoy_service_load_stats_v3_LoadStatsRequest* request = @@ -1916,8 +1915,8 @@ grpc_slice XdsApi::CreateLrsInitialRequest() { envoy_config_core_v3_Node* node_msg = envoy_service_load_stats_v3_LoadStatsRequest_mutable_node(request, arena.ptr()); - PopulateNode(arena.ptr(), bootstrap_, build_version_, user_agent_name_, - node_msg); + PopulateNode(arena.ptr(), node_, server.ShouldUseV3(), build_version_, + user_agent_name_, node_msg); envoy_config_core_v3_Node_add_client_features( node_msg, upb_strview_makez("envoy.lrs.supports_send_all_clusters"), arena.ptr()); diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h index d7dfd53e8d7..cc13c6adc7f 100644 --- a/src/core/ext/xds/xds_api.h +++ b/src/core/ext/xds/xds_api.h @@ -363,11 +363,12 @@ class XdsApi { std::pair, ClusterLoadReport>; - XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap* bootstrap); + XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap::Node* node); // Creates an ADS request. // Takes ownership of \a error. - grpc_slice CreateAdsRequest(const std::string& type_url, + grpc_slice CreateAdsRequest(const XdsBootstrap::XdsServer& server, + const std::string& type_url, const std::set& resource_names, const std::string& version, const std::string& nonce, grpc_error* error, @@ -394,7 +395,7 @@ class XdsApi { const std::set& expected_eds_service_names); // Creates an initial LRS request. - grpc_slice CreateLrsInitialRequest(); + grpc_slice CreateLrsInitialRequest(const XdsBootstrap::XdsServer& server); // Creates an LRS request sending a client-side load report. grpc_slice CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map); @@ -410,8 +411,7 @@ class XdsApi { private: XdsClient* client_; TraceFlag* tracer_; - const bool use_v3_; - const XdsBootstrap* bootstrap_; // Do not own. + const XdsBootstrap::Node* node_; // Do not own. upb::SymbolTable symtab_; const std::string build_version_; const std::string user_agent_name_; diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index c49d8f73ceb..a8ef7ab9ec7 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -431,13 +431,40 @@ class XdsClient::ChannelState::StateWatcher // XdsClient::ChannelState // +namespace { + +grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) { + // Build channel args. + absl::InlinedVector args_to_add = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), + 5 * 60 * GPR_MS_PER_SEC), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1), + }; + grpc_channel_args* new_args = grpc_channel_args_copy_and_add( + g_channel_args, args_to_add.data(), args_to_add.size()); + // Create channel. + grpc_channel* channel = grpc_secure_channel_create( + server.channel_creds.get(), server.server_uri.c_str(), new_args, nullptr); + grpc_channel_args_destroy(new_args); + return channel; +} + +} // namespace + XdsClient::ChannelState::ChannelState(WeakRefCountedPtr xds_client, - grpc_channel* channel) + const XdsBootstrap::XdsServer& server) : InternallyRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "ChannelState" : nullptr), xds_client_(std::move(xds_client)), - channel_(channel) { + server_(server) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", + xds_client_.get(), server.server_uri.c_str()); + } + channel_ = CreateXdsChannel(server); GPR_ASSERT(channel_ != nullptr); StartConnectivityWatchLocked(); } @@ -646,7 +673,7 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( GPR_ASSERT(xds_client() != nullptr); // Create a call with the specified method name. const auto& method = - xds_client()->bootstrap_->server().ShouldUseV3() + chand()->server_.ShouldUseV3() ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES; call_ = grpc_channel_create_pollset_set_call( @@ -767,8 +794,9 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( std::set resource_names = ResourceNamesForRequest(type_url); request_payload_slice = xds_client()->api_.CreateAdsRequest( - type_url, resource_names, xds_client()->resource_version_map_[type_url], - state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_); + chand()->server_, type_url, resource_names, + xds_client()->resource_version_map_[type_url], state.nonce, + GRPC_ERROR_REF(state.error), !sent_initial_message_); if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl && type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) { state_map_.erase(type_url); @@ -1401,7 +1429,7 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( // the polling entities from client_channel. GPR_ASSERT(xds_client() != nullptr); const auto& method = - xds_client()->bootstrap_->server().ShouldUseV3() + chand()->server_.ShouldUseV3() ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS; call_ = grpc_channel_create_pollset_set_call( @@ -1411,7 +1439,7 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( GPR_ASSERT(call_ != nullptr); // Init the request payload. grpc_slice request_payload_slice = - xds_client()->api_.CreateLrsInitialRequest(); + xds_client()->api_.CreateLrsInitialRequest(chand()->server_); send_message_payload_ = grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_slice_unref_internal(request_payload_slice); @@ -1702,25 +1730,6 @@ grpc_millis GetRequestTimeout() { {15000, 0, INT_MAX}); } -grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap) { - // Build channel args. - absl::InlinedVector args_to_add = { - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), - 5 * 60 * GPR_MS_PER_SEC), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1), - }; - grpc_channel_args* new_args = grpc_channel_args_copy_and_add( - g_channel_args, args_to_add.data(), args_to_add.size()); - // Create channel. - grpc_channel* channel = grpc_secure_channel_create( - bootstrap.server().channel_creds.get(), - bootstrap.server().server_uri.c_str(), new_args, nullptr); - grpc_channel_args_destroy(new_args); - return channel; -} - } // namespace XdsClient::XdsClient(grpc_error** error) @@ -1731,7 +1740,8 @@ XdsClient::XdsClient(grpc_error** error) interested_parties_(grpc_pollset_set_create()), bootstrap_( XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)), - api_(this, &grpc_xds_client_trace, bootstrap_.get()) { + api_(this, &grpc_xds_client_trace, + bootstrap_ == nullptr ? nullptr : bootstrap_->node()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); } @@ -1740,14 +1750,9 @@ XdsClient::XdsClient(grpc_error** error) this, grpc_error_string(*error)); return; } - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", this, - bootstrap_->server().server_uri.c_str()); - } - grpc_channel* channel = CreateXdsChannel(*bootstrap_); // Create ChannelState object. chand_ = MakeOrphanable( - WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), channel); + WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server()); } XdsClient::~XdsClient() { diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 15e95c6c8ca..49ec9dcc32a 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -198,7 +198,7 @@ class XdsClient : public DualRefCounted { class LrsCallState; ChannelState(WeakRefCountedPtr xds_client, - grpc_channel* channel); + const XdsBootstrap::XdsServer& server); ~ChannelState() override; void Orphan() override; @@ -226,6 +226,8 @@ class XdsClient : public DualRefCounted { // The owning xds client. WeakRefCountedPtr xds_client_; + const XdsBootstrap::XdsServer& server_; + // The channel and its status. grpc_channel* channel_; bool shutting_down_ = false;