Merge pull request #23787 from markdroth/xds_client_api2

Pass listening addresses into XdsClient.
pull/23770/head^2
Mark D. Roth 5 years ago committed by GitHub
commit 0649026e8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
  2. 1
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  3. 48
      src/core/ext/xds/xds_api.cc
  4. 11
      src/core/ext/xds/xds_api.h
  5. 6
      src/core/ext/xds/xds_client.cc
  6. 6
      src/core/ext/xds/xds_client.h

@ -457,6 +457,7 @@ void EdsLb::UpdateLocked(UpdateArgs args) {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(
work_serializer(), interested_parties(), GetEdsResourceName(),
std::vector<grpc_resolved_address>{},
nullptr /* service config watcher */, *args_, &error);
// TODO(roth): If we decide that we care about EDS-only mode, add
// proper error handling here.

@ -191,6 +191,7 @@ void XdsResolver::StartLocked() {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(
work_serializer(), interested_parties_, server_name_,
std::vector<grpc_resolved_address>{},
absl::make_unique<ListenerWatcher>(Ref()), *args_, &error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,

@ -24,6 +24,7 @@
#include <cstdlib>
#include <string>
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
@ -39,6 +40,7 @@
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@ -463,6 +465,7 @@ void PopulateNode(upb_arena* arena, const XdsBootstrap* bootstrap,
const std::string& build_version,
const std::string& user_agent_name,
const std::string& server_name,
const std::vector<grpc_resolved_address>& listening_addresses,
envoy_config_core_v3_Node* node_msg) {
const XdsBootstrap::Node* node = bootstrap->node();
if (node != nullptr) {
@ -510,6 +513,21 @@ void PopulateNode(upb_arena* arena, const XdsBootstrap* bootstrap,
if (!bootstrap->server().ShouldUseV3()) {
PopulateBuildVersion(arena, node_msg, build_version);
}
for (const grpc_resolved_address& address : listening_addresses) {
std::string address_str = grpc_sockaddr_to_string(&address, false);
absl::string_view addr_str;
absl::string_view port_str;
GPR_ASSERT(SplitHostPort(address_str, &addr_str, &port_str));
uint32_t port;
GPR_ASSERT(absl::SimpleAtoi(port_str, &port));
auto* addr_msg =
envoy_config_core_v3_Node_add_listening_addresses(node_msg, arena);
auto* socket_addr_msg =
envoy_config_core_v3_Address_mutable_socket_address(addr_msg, arena);
envoy_config_core_v3_SocketAddress_set_address(
socket_addr_msg, upb_strview_make(addr_str.data(), addr_str.size()));
envoy_config_core_v3_SocketAddress_set_port_value(socket_addr_msg, port);
}
envoy_config_core_v3_Node_set_user_agent_name(
node_msg,
upb_strview_make(user_agent_name.data(), user_agent_name.size()));
@ -628,6 +646,29 @@ void AddNodeLogFields(const envoy_config_core_v3_Node* node,
fields->emplace_back(
absl::StrCat(" build_version: \"", build_version, "\""));
}
// listening_addresses
size_t num_listening_addresses;
const envoy_config_core_v3_Address* const* listening_addresses =
envoy_config_core_v3_Node_listening_addresses(node,
&num_listening_addresses);
for (size_t i = 0; i < num_listening_addresses; ++i) {
fields->emplace_back(" listening_address {");
const auto* socket_addr_msg =
envoy_config_core_v3_Address_socket_address(listening_addresses[i]);
if (socket_addr_msg != nullptr) {
fields->emplace_back(" socket_address {");
AddStringField(
" address",
envoy_config_core_v3_SocketAddress_address(socket_addr_msg), fields);
if (envoy_config_core_v3_SocketAddress_has_port_value(socket_addr_msg)) {
fields->emplace_back(absl::StrCat(
" port_value: ",
envoy_config_core_v3_SocketAddress_port_value(socket_addr_msg)));
}
fields->emplace_back(" }");
}
fields->emplace_back(" }");
}
// user_agent_name
AddStringField(" user_agent_name",
envoy_config_core_v3_Node_user_agent_name(node), fields);
@ -730,7 +771,8 @@ grpc_slice XdsApi::CreateAdsRequest(
const std::string& type_url,
const std::set<absl::string_view>& resource_names,
const std::string& version, const std::string& nonce, grpc_error* error,
bool populate_node) {
bool populate_node,
const std::vector<grpc_resolved_address>& listening_addresses) {
upb::Arena arena;
// Create a request.
envoy_service_discovery_v3_DiscoveryRequest* request =
@ -771,7 +813,7 @@ grpc_slice XdsApi::CreateAdsRequest(
envoy_service_discovery_v3_DiscoveryRequest_mutable_node(request,
arena.ptr());
PopulateNode(arena.ptr(), bootstrap_, build_version_, user_agent_name_, "",
node_msg);
listening_addresses, node_msg);
}
// Add resource_names.
for (const auto& resource_name : resource_names) {
@ -2197,7 +2239,7 @@ grpc_slice XdsApi::CreateLrsInitialRequest(const std::string& server_name) {
envoy_service_load_stats_v3_LoadStatsRequest_mutable_node(request,
arena.ptr());
PopulateNode(arena.ptr(), bootstrap_, build_version_, user_agent_name_,
server_name, node_msg);
server_name, {}, node_msg);
envoy_config_core_v3_Node_add_client_features(
node_msg, upb_strview_makez("envoy.lrs.supports_send_all_clusters"),
arena.ptr());

@ -295,11 +295,12 @@ class XdsApi {
// Creates an ADS request.
// Takes ownership of \a error.
grpc_slice CreateAdsRequest(const std::string& type_url,
const std::set<absl::string_view>& resource_names,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node);
grpc_slice CreateAdsRequest(
const std::string& type_url,
const std::set<absl::string_view>& resource_names,
const std::string& version, const std::string& nonce, grpc_error* error,
bool populate_node,
const std::vector<grpc_resolved_address>& listening_addresses);
// Parses an ADS response.
// If the response can't be parsed at the top level, the resulting

@ -679,7 +679,6 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
// activity in xds_client()->interested_parties_, which is comprised of
// the polling entities from client_channel.
GPR_ASSERT(xds_client() != nullptr);
GPR_ASSERT(!xds_client()->server_name_.empty());
// Create a call with the specified method name.
const auto& method =
xds_client()->bootstrap_->server().ShouldUseV3()
@ -806,7 +805,8 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
ResourceNamesForRequest(type_url);
request_payload_slice = xds_client()->api_.CreateAdsRequest(
type_url, resource_names, state.version, state.nonce,
GRPC_ERROR_REF(state.error), !sent_initial_message_);
GRPC_ERROR_REF(state.error), !sent_initial_message_,
xds_client()->listening_addresses_);
if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
state_map_.erase(type_url);
@ -1745,6 +1745,7 @@ grpc_millis GetRequestTimeout(const grpc_channel_args& args) {
XdsClient::XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
grpc_pollset_set* interested_parties,
absl::string_view server_name,
std::vector<grpc_resolved_address> listening_addresses,
std::unique_ptr<ListenerWatcherInterface> watcher,
const grpc_channel_args& channel_args, grpc_error** error)
: InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
@ -1755,6 +1756,7 @@ XdsClient::XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
api_(this, &grpc_xds_client_trace, bootstrap_.get()),
server_name_(server_name),
listening_addresses_(std::move(listening_addresses)),
listener_watcher_(std::move(watcher)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);

@ -20,6 +20,7 @@
#include <grpc/support/port_platform.h>
#include <set>
#include <vector>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
@ -32,6 +33,7 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/work_serializer.h"
namespace grpc_core {
@ -76,10 +78,13 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
virtual void OnResourceDoesNotExist() = 0;
};
// gRPC client should populate server_name.
// gRPC server should populate listening_addresses.
// If *error is not GRPC_ERROR_NONE after construction, then there was
// an error initializing the client.
XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
grpc_pollset_set* interested_parties, absl::string_view server_name,
std::vector<grpc_resolved_address> listening_addresses,
std::unique_ptr<ListenerWatcherInterface> watcher,
const grpc_channel_args& channel_args, grpc_error** error);
~XdsClient();
@ -251,6 +256,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
XdsApi api_;
const std::string server_name_;
const std::vector<grpc_resolved_address> listening_addresses_;
std::unique_ptr<ListenerWatcherInterface> listener_watcher_;
// The channel for communicating with the xds server.

Loading…
Cancel
Save