Move xds API code into a class, so that common state can be cached.

pull/21933/head
Mark D. Roth 5 years ago
parent c564d28f9a
commit f739557803
  1. 4
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  2. 16
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  3. 361
      src/core/ext/filters/client_channel/xds/xds_api.cc
  4. 355
      src/core/ext/filters/client_channel/xds/xds_api.h
  5. 132
      src/core/ext/filters/client_channel/xds/xds_client.cc
  6. 9
      src/core/ext/filters/client_channel/xds/xds_client.h

@ -63,7 +63,7 @@ class CdsLb : public LoadBalancingPolicy {
public:
explicit ClusterWatcher(RefCountedPtr<CdsLb> parent)
: parent_(std::move(parent)) {}
void OnClusterChanged(CdsUpdate cluster_data) override;
void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override;
void OnError(grpc_error* error) override;
private:
@ -111,7 +111,7 @@ class CdsLb : public LoadBalancingPolicy {
// CdsLb::ClusterWatcher
//
void CdsLb::ClusterWatcher::OnClusterChanged(CdsUpdate cluster_data) {
void CdsLb::ClusterWatcher::OnClusterChanged(XdsApi::CdsUpdate cluster_data) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] received CDS update from xds client",
parent_.get());

@ -172,7 +172,7 @@ class XdsLb : public LoadBalancingPolicy {
RefCountedPtr<XdsLb> xds_policy_;
PickerList pickers_;
RefCountedPtr<XdsDropConfig> drop_config_;
RefCountedPtr<XdsApi::DropConfig> drop_config_;
};
class FallbackHelper : public ChannelControlHelper {
@ -286,7 +286,7 @@ class XdsLb : public LoadBalancingPolicy {
~LocalityMap() { xds_policy_.reset(DEBUG_LOCATION, "LocalityMap"); }
void UpdateLocked(
const XdsPriorityListUpdate::LocalityMap& locality_map_update);
const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update);
void ResetBackoffLocked();
void UpdateXdsPickerLocked();
OrphanablePtr<Locality> ExtractLocalityLocked(
@ -316,10 +316,10 @@ class XdsLb : public LoadBalancingPolicy {
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
static void OnFailoverTimerLocked(void* arg, grpc_error* error);
const XdsPriorityListUpdate& priority_list_update() const {
const XdsApi::PriorityListUpdate& priority_list_update() const {
return xds_policy_->priority_list_update_;
}
const XdsPriorityListUpdate::LocalityMap* locality_map_update() const {
const XdsApi::PriorityListUpdate::LocalityMap* locality_map_update() const {
return xds_policy_->priority_list_update_.Find(priority_);
}
@ -431,10 +431,10 @@ class XdsLb : public LoadBalancingPolicy {
// The priority that is being used.
uint32_t current_priority_ = UINT32_MAX;
// The update for priority_list_.
XdsPriorityListUpdate priority_list_update_;
XdsApi::PriorityListUpdate priority_list_update_;
// The config for dropping calls.
RefCountedPtr<XdsDropConfig> drop_config_;
RefCountedPtr<XdsApi::DropConfig> drop_config_;
// The stats for client-side load reporting.
XdsClientStats client_stats_;
@ -594,7 +594,7 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface {
~EndpointWatcher() { xds_policy_.reset(DEBUG_LOCATION, "EndpointWatcher"); }
void OnEndpointChanged(EdsUpdate update) override {
void OnEndpointChanged(XdsApi::EdsUpdate update) override {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Received EDS update from xds client",
xds_policy_.get());
@ -1154,7 +1154,7 @@ XdsLb::LocalityMap::LocalityMap(RefCountedPtr<XdsLb> xds_policy,
}
void XdsLb::LocalityMap::UpdateLocked(
const XdsPriorityListUpdate::LocalityMap& locality_map_update) {
const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update) {
if (xds_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Start Updating priority %" PRIu32,

@ -56,8 +56,12 @@
namespace grpc_core {
bool XdsPriorityListUpdate::operator==(
const XdsPriorityListUpdate& other) const {
//
// XdsApi::PriorityListUpdate
//
bool XdsApi::PriorityListUpdate::operator==(
const XdsApi::PriorityListUpdate& other) const {
if (priorities_.size() != other.priorities_.size()) return false;
for (size_t i = 0; i < priorities_.size(); ++i) {
if (priorities_[i].localities != other.priorities_[i].localities) {
@ -67,8 +71,8 @@ bool XdsPriorityListUpdate::operator==(
return true;
}
void XdsPriorityListUpdate::Add(
XdsPriorityListUpdate::LocalityMap::Locality locality) {
void XdsApi::PriorityListUpdate::Add(
XdsApi::PriorityListUpdate::LocalityMap::Locality locality) {
// Pad the missing priorities in case the localities are not ordered by
// priority.
if (!Contains(locality.priority)) priorities_.resize(locality.priority + 1);
@ -76,13 +80,13 @@ void XdsPriorityListUpdate::Add(
locality_map.localities.emplace(locality.name, std::move(locality));
}
const XdsPriorityListUpdate::LocalityMap* XdsPriorityListUpdate::Find(
const XdsApi::PriorityListUpdate::LocalityMap* XdsApi::PriorityListUpdate::Find(
uint32_t priority) const {
if (!Contains(priority)) return nullptr;
return &priorities_[priority];
}
bool XdsPriorityListUpdate::Contains(
bool XdsApi::PriorityListUpdate::Contains(
const RefCountedPtr<XdsLocalityName>& name) {
for (size_t i = 0; i < priorities_.size(); ++i) {
const LocalityMap& locality_map = priorities_[i];
@ -91,7 +95,11 @@ bool XdsPriorityListUpdate::Contains(
return false;
}
bool XdsDropConfig::ShouldDrop(const std::string** category_name) const {
//
// XdsApi::DropConfig
//
bool XdsApi::DropConfig::ShouldDrop(const std::string** category_name) const {
for (size_t i = 0; i < drop_category_list_.size(); ++i) {
const auto& drop_category = drop_category_list_[i];
// Generate a random number in [0, 1000000).
@ -104,6 +112,17 @@ bool XdsDropConfig::ShouldDrop(const std::string** category_name) const {
return false;
}
//
// XdsApi
//
const char* XdsApi::kLdsTypeUrl = "type.googleapis.com/envoy.api.v2.Listener";
const char* XdsApi::kRdsTypeUrl =
"type.googleapis.com/envoy.api.v2.RouteConfiguration";
const char* XdsApi::kCdsTypeUrl = "type.googleapis.com/envoy.api.v2.Cluster";
const char* XdsApi::kEdsTypeUrl =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
namespace {
void PopulateMetadataValue(upb_arena* arena, google_protobuf_Value* value_pb,
@ -203,67 +222,21 @@ void PopulateNode(upb_arena* arena, const XdsBootstrap::Node* node,
upb_strview_makez(build_version));
}
} // namespace
grpc_slice XdsUnsupportedTypeNackRequestCreateAndEncode(
const std::string& type_url, const std::string& nonce, grpc_error* error) {
upb::Arena arena;
envoy_api_v2_DiscoveryRequest* CreateDiscoveryRequest(
upb_arena* arena, const char* type_url, const std::string& version,
const std::string& nonce, grpc_error* error, const XdsBootstrap::Node* node,
const char* build_version) {
// Create a request.
envoy_api_v2_DiscoveryRequest* request =
envoy_api_v2_DiscoveryRequest_new(arena.ptr());
envoy_api_v2_DiscoveryRequest_new(arena);
// Set type_url.
envoy_api_v2_DiscoveryRequest_set_type_url(
request, upb_strview_makez(type_url.c_str()));
// Set nonce.
envoy_api_v2_DiscoveryRequest_set_response_nonce(
request, upb_strview_makez(nonce.c_str()));
// Set error_detail.
grpc_slice error_description_slice;
GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
&error_description_slice));
upb_strview error_description_strview =
upb_strview_make(reinterpret_cast<const char*>(
GPR_SLICE_START_PTR(error_description_slice)),
GPR_SLICE_LENGTH(error_description_slice));
google_rpc_Status* error_detail =
envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, arena.ptr());
google_rpc_Status_set_message(error_detail, error_description_strview);
GRPC_ERROR_UNREF(error);
// Encode the request.
size_t output_length;
char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
&output_length);
return grpc_slice_from_copied_buffer(output, output_length);
}
grpc_slice XdsLdsRequestCreateAndEncode(const std::string& server_name,
const XdsBootstrap::Node* node,
const char* build_version,
const std::string& version,
const std::string& nonce,
grpc_error* error) {
upb::Arena arena;
// Create a request.
envoy_api_v2_DiscoveryRequest* request =
envoy_api_v2_DiscoveryRequest_new(arena.ptr());
envoy_api_v2_DiscoveryRequest_set_type_url(request,
upb_strview_makez(type_url));
// Set version_info.
if (!version.empty()) {
envoy_api_v2_DiscoveryRequest_set_version_info(
request, upb_strview_makez(version.c_str()));
}
// Populate node.
if (build_version != nullptr) {
envoy_api_v2_core_Node* node_msg =
envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
PopulateNode(arena.ptr(), node, build_version, node_msg);
}
// Add resource_name.
envoy_api_v2_DiscoveryRequest_add_resource_names(
request, upb_strview_make(server_name.data(), server_name.size()),
arena.ptr());
// Set type_url.
envoy_api_v2_DiscoveryRequest_set_type_url(request,
upb_strview_makez(kLdsTypeUrl));
// Set nonce.
if (!nonce.empty()) {
envoy_api_v2_DiscoveryRequest_set_response_nonce(
@ -279,148 +252,98 @@ grpc_slice XdsLdsRequestCreateAndEncode(const std::string& server_name,
GPR_SLICE_START_PTR(error_description_slice)),
GPR_SLICE_LENGTH(error_description_slice));
google_rpc_Status* error_detail =
envoy_api_v2_DiscoveryRequest_mutable_error_detail(request,
arena.ptr());
envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, arena);
google_rpc_Status_set_message(error_detail, error_description_strview);
GRPC_ERROR_UNREF(error);
}
// Encode the request.
// Populate node.
if (build_version != nullptr) {
envoy_api_v2_core_Node* node_msg =
envoy_api_v2_DiscoveryRequest_mutable_node(request, arena);
PopulateNode(arena, node, build_version, node_msg);
}
return request;
}
grpc_slice SerializeDiscoveryRequest(upb_arena* arena,
envoy_api_v2_DiscoveryRequest* request) {
size_t output_length;
char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
&output_length);
char* output =
envoy_api_v2_DiscoveryRequest_serialize(request, arena, &output_length);
return grpc_slice_from_copied_buffer(output, output_length);
}
grpc_slice XdsRdsRequestCreateAndEncode(const std::string& route_config_name,
const XdsBootstrap::Node* node,
const char* build_version,
const std::string& version,
const std::string& nonce,
grpc_error* error) {
} // namespace
grpc_slice XdsApi::CreateUnsupportedTypeNackRequest(const std::string& type_url,
const std::string& nonce,
grpc_error* error) {
upb::Arena arena;
envoy_api_v2_DiscoveryRequest* request = CreateDiscoveryRequest(
arena.ptr(), type_url.c_str(), /*version=*/"", nonce, error,
/*node=*/nullptr, /*build_version=*/nullptr);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
grpc_slice XdsApi::CreateLdsRequest(const std::string& server_name,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node) {
upb::Arena arena;
// Create a request.
envoy_api_v2_DiscoveryRequest* request =
envoy_api_v2_DiscoveryRequest_new(arena.ptr());
// Set version_info.
if (!version.empty()) {
envoy_api_v2_DiscoveryRequest_set_version_info(
request, upb_strview_makez(version.c_str()));
}
// Populate node.
if (build_version != nullptr) {
envoy_api_v2_core_Node* node_msg =
envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
PopulateNode(arena.ptr(), node, build_version, node_msg);
}
CreateDiscoveryRequest(arena.ptr(), kLdsTypeUrl, version, nonce, error,
populate_node ? node_ : nullptr,
populate_node ? build_version_ : nullptr);
// Add resource_name.
envoy_api_v2_DiscoveryRequest_add_resource_names(
request, upb_strview_make(server_name.data(), server_name.size()),
arena.ptr());
return SerializeDiscoveryRequest(arena.ptr(), request);
}
grpc_slice XdsApi::CreateRdsRequest(const std::string& route_config_name,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node) {
upb::Arena arena;
envoy_api_v2_DiscoveryRequest* request =
CreateDiscoveryRequest(arena.ptr(), kRdsTypeUrl, version, nonce, error,
populate_node ? node_ : nullptr,
populate_node ? build_version_ : nullptr);
// Add resource_name.
envoy_api_v2_DiscoveryRequest_add_resource_names(
request,
upb_strview_make(route_config_name.data(), route_config_name.size()),
arena.ptr());
// Set type_url.
envoy_api_v2_DiscoveryRequest_set_type_url(request,
upb_strview_makez(kRdsTypeUrl));
// Set nonce.
if (!nonce.empty()) {
envoy_api_v2_DiscoveryRequest_set_response_nonce(
request, upb_strview_makez(nonce.c_str()));
}
// Set error_detail if it's a NACK.
if (error != GRPC_ERROR_NONE) {
grpc_slice error_description_slice;
GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
&error_description_slice));
upb_strview error_description_strview =
upb_strview_make(reinterpret_cast<const char*>(
GPR_SLICE_START_PTR(error_description_slice)),
GPR_SLICE_LENGTH(error_description_slice));
google_rpc_Status* error_detail =
envoy_api_v2_DiscoveryRequest_mutable_error_detail(request,
arena.ptr());
google_rpc_Status_set_message(error_detail, error_description_strview);
GRPC_ERROR_UNREF(error);
}
// Encode the request.
size_t output_length;
char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
&output_length);
return grpc_slice_from_copied_buffer(output, output_length);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
grpc_slice XdsCdsRequestCreateAndEncode(
const std::set<StringView>& cluster_names, const XdsBootstrap::Node* node,
const char* build_version, const std::string& version,
const std::string& nonce, grpc_error* error) {
grpc_slice XdsApi::CreateCdsRequest(const std::set<StringView>& cluster_names,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node) {
upb::Arena arena;
// Create a request.
envoy_api_v2_DiscoveryRequest* request =
envoy_api_v2_DiscoveryRequest_new(arena.ptr());
// Set version_info.
if (!version.empty()) {
envoy_api_v2_DiscoveryRequest_set_version_info(
request, upb_strview_makez(version.c_str()));
}
// Populate node.
if (build_version != nullptr) {
envoy_api_v2_core_Node* node_msg =
envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
PopulateNode(arena.ptr(), node, build_version, node_msg);
}
CreateDiscoveryRequest(arena.ptr(), kCdsTypeUrl, version, nonce, error,
populate_node ? node_ : nullptr,
populate_node ? build_version_ : nullptr);
// Add resource_names.
for (const auto& cluster_name : cluster_names) {
envoy_api_v2_DiscoveryRequest_add_resource_names(
request, upb_strview_make(cluster_name.data(), cluster_name.size()),
arena.ptr());
}
// Set type_url.
envoy_api_v2_DiscoveryRequest_set_type_url(request,
upb_strview_makez(kCdsTypeUrl));
// Set nonce.
if (!nonce.empty()) {
envoy_api_v2_DiscoveryRequest_set_response_nonce(
request, upb_strview_makez(nonce.c_str()));
}
// Set error_detail if it's a NACK.
if (error != GRPC_ERROR_NONE) {
grpc_slice error_description_slice;
GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
&error_description_slice));
upb_strview error_description_strview =
upb_strview_make(reinterpret_cast<const char*>(
GPR_SLICE_START_PTR(error_description_slice)),
GPR_SLICE_LENGTH(error_description_slice));
google_rpc_Status* error_detail =
envoy_api_v2_DiscoveryRequest_mutable_error_detail(request,
arena.ptr());
google_rpc_Status_set_message(error_detail, error_description_strview);
GRPC_ERROR_UNREF(error);
}
// Encode the request.
size_t output_length;
char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
&output_length);
return grpc_slice_from_copied_buffer(output, output_length);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
grpc_slice XdsEdsRequestCreateAndEncode(
const std::set<StringView>& eds_service_names,
const XdsBootstrap::Node* node, const char* build_version,
const std::string& version, const std::string& nonce, grpc_error* error) {
grpc_slice XdsApi::CreateEdsRequest(
const std::set<StringView>& eds_service_names, const std::string& version,
const std::string& nonce, grpc_error* error, bool populate_node) {
upb::Arena arena;
// Create a request.
envoy_api_v2_DiscoveryRequest* request =
envoy_api_v2_DiscoveryRequest_new(arena.ptr());
// Set version_info.
if (!version.empty()) {
envoy_api_v2_DiscoveryRequest_set_version_info(
request, upb_strview_makez(version.c_str()));
}
// Populate node.
if (build_version != nullptr) {
envoy_api_v2_core_Node* node_msg =
envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
PopulateNode(arena.ptr(), node, build_version, node_msg);
}
CreateDiscoveryRequest(arena.ptr(), kEdsTypeUrl, version, nonce, error,
populate_node ? node_ : nullptr,
populate_node ? build_version_ : nullptr);
// Add resource_names.
for (const auto& eds_service_name : eds_service_names) {
envoy_api_v2_DiscoveryRequest_add_resource_names(
@ -428,34 +351,7 @@ grpc_slice XdsEdsRequestCreateAndEncode(
upb_strview_make(eds_service_name.data(), eds_service_name.size()),
arena.ptr());
}
// Set type_url.
envoy_api_v2_DiscoveryRequest_set_type_url(request,
upb_strview_makez(kEdsTypeUrl));
// Set nonce.
if (!nonce.empty()) {
envoy_api_v2_DiscoveryRequest_set_response_nonce(
request, upb_strview_makez(nonce.c_str()));
}
// Set error_detail if it's a NACK.
if (error != GRPC_ERROR_NONE) {
grpc_slice error_description_slice;
GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
&error_description_slice));
upb_strview error_description_strview =
upb_strview_make(reinterpret_cast<const char*>(
GPR_SLICE_START_PTR(error_description_slice)),
GPR_SLICE_LENGTH(error_description_slice));
google_rpc_Status* error_detail =
envoy_api_v2_DiscoveryRequest_mutable_error_detail(request,
arena.ptr());
google_rpc_Status_set_message(error_detail, error_description_strview);
GRPC_ERROR_UNREF(error);
}
// Encode the request.
size_t output_length;
char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
&output_length);
return grpc_slice_from_copied_buffer(output, output_length);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
namespace {
@ -511,7 +407,7 @@ MatchType DomainPatternMatchType(const std::string& domain_pattern) {
grpc_error* RouteConfigParse(
const envoy_api_v2_RouteConfiguration* route_config,
const std::string& expected_server_name, RdsUpdate* rds_update) {
const std::string& expected_server_name, XdsApi::RdsUpdate* rds_update) {
// Strip off port from server name, if any.
size_t pos = expected_server_name.find(':');
std::string expected_host_name = expected_server_name.substr(0, pos);
@ -604,11 +500,9 @@ grpc_error* RouteConfigParse(
return GRPC_ERROR_NONE;
}
} // namespace
grpc_error* LdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
const std::string& expected_server_name,
LdsUpdate* lds_update, upb_arena* arena) {
XdsApi::LdsUpdate* lds_update, upb_arena* arena) {
// Get the resources from the response.
size_t size;
const google_protobuf_Any* const* resources =
@ -620,7 +514,7 @@ grpc_error* LdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
for (size_t i = 0; i < size; ++i) {
// Check the type_url of the resource.
const upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
if (!upb_strview_eql(type_url, upb_strview_makez(kLdsTypeUrl))) {
if (!upb_strview_eql(type_url, upb_strview_makez(XdsApi::kLdsTypeUrl))) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not LDS.");
}
// Decode the listener.
@ -655,7 +549,7 @@ grpc_error* LdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
const envoy_api_v2_RouteConfiguration* route_config =
envoy_config_filter_network_http_connection_manager_v2_HttpConnectionManager_route_config(
http_connection_manager);
RdsUpdate rds_update;
XdsApi::RdsUpdate rds_update;
grpc_error* error =
RouteConfigParse(route_config, expected_server_name, &rds_update);
if (error != GRPC_ERROR_NONE) return error;
@ -690,7 +584,7 @@ grpc_error* LdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
grpc_error* RdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
const std::string& expected_server_name,
const std::string& expected_route_config_name,
RdsUpdate* rds_update, upb_arena* arena) {
XdsApi::RdsUpdate* rds_update, upb_arena* arena) {
// Get the resources from the response.
size_t size;
const google_protobuf_Any* const* resources =
@ -702,7 +596,7 @@ grpc_error* RdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
for (size_t i = 0; i < size; ++i) {
// Check the type_url of the resource.
const upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
if (!upb_strview_eql(type_url, upb_strview_makez(kRdsTypeUrl))) {
if (!upb_strview_eql(type_url, upb_strview_makez(XdsApi::kRdsTypeUrl))) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not RDS.");
}
// Decode the route_config.
@ -720,7 +614,7 @@ grpc_error* RdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
upb_strview_makez(expected_route_config_name.c_str());
if (!upb_strview_eql(name, expected_name)) continue;
// Parse the route_config.
RdsUpdate local_rds_update;
XdsApi::RdsUpdate local_rds_update;
grpc_error* error =
RouteConfigParse(route_config, expected_server_name, &local_rds_update);
if (error != GRPC_ERROR_NONE) return error;
@ -732,7 +626,8 @@ grpc_error* RdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
}
grpc_error* CdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
CdsUpdateMap* cds_update_map, upb_arena* arena) {
XdsApi::CdsUpdateMap* cds_update_map,
upb_arena* arena) {
// Get the resources from the response.
size_t size;
const google_protobuf_Any* const* resources =
@ -743,10 +638,10 @@ grpc_error* CdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
}
// Parse all the resources in the CDS response.
for (size_t i = 0; i < size; ++i) {
CdsUpdate cds_update;
XdsApi::CdsUpdate cds_update;
// Check the type_url of the resource.
const upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
if (!upb_strview_eql(type_url, upb_strview_makez(kCdsTypeUrl))) {
if (!upb_strview_eql(type_url, upb_strview_makez(XdsApi::kCdsTypeUrl))) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not CDS.");
}
// Decode the cluster.
@ -801,8 +696,6 @@ grpc_error* CdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
return GRPC_ERROR_NONE;
}
namespace {
grpc_error* ServerAddressParseAndAppend(
const envoy_api_v2_endpoint_LbEndpoint* lb_endpoint,
ServerAddressList* list) {
@ -840,7 +733,7 @@ grpc_error* ServerAddressParseAndAppend(
grpc_error* LocalityParse(
const envoy_api_v2_endpoint_LocalityLbEndpoints* locality_lb_endpoints,
XdsPriorityListUpdate::LocalityMap::Locality* output_locality) {
XdsApi::PriorityListUpdate::LocalityMap::Locality* output_locality) {
// Parse LB weight.
const google_protobuf_UInt32Value* lb_weight =
envoy_api_v2_endpoint_LocalityLbEndpoints_load_balancing_weight(
@ -878,7 +771,7 @@ grpc_error* LocalityParse(
grpc_error* DropParseAndAppend(
const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* drop_overload,
XdsDropConfig* drop_config, bool* drop_all) {
XdsApi::DropConfig* drop_config, bool* drop_all) {
// Get the category.
upb_strview category =
envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category(
@ -918,7 +811,7 @@ grpc_error* DropParseAndAppend(
grpc_error* EdsResponsedParse(
const envoy_api_v2_DiscoveryResponse* response,
const std::set<StringView>& expected_eds_service_names,
EdsUpdateMap* eds_update_map, upb_arena* arena) {
XdsApi::EdsUpdateMap* eds_update_map, upb_arena* arena) {
// Get the resources from the response.
size_t size;
const google_protobuf_Any* const* resources =
@ -928,10 +821,10 @@ grpc_error* EdsResponsedParse(
"EDS response contains 0 resource.");
}
for (size_t i = 0; i < size; ++i) {
EdsUpdate eds_update;
XdsApi::EdsUpdate eds_update;
// Check the type_url of the resource.
upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
if (!upb_strview_eql(type_url, upb_strview_makez(kEdsTypeUrl))) {
if (!upb_strview_eql(type_url, upb_strview_makez(XdsApi::kEdsTypeUrl))) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
}
// Get the cluster_load_assignment.
@ -960,7 +853,7 @@ grpc_error* EdsResponsedParse(
envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment,
&locality_size);
for (size_t j = 0; j < locality_size; ++j) {
XdsPriorityListUpdate::LocalityMap::Locality locality;
XdsApi::PriorityListUpdate::LocalityMap::Locality locality;
grpc_error* error = LocalityParse(endpoints[j], &locality);
if (error != GRPC_ERROR_NONE) return error;
// Filter out locality with weight 0.
@ -968,7 +861,7 @@ grpc_error* EdsResponsedParse(
eds_update.priority_list_update.Add(locality);
}
// Get the drop config.
eds_update.drop_config = MakeRefCounted<XdsDropConfig>();
eds_update.drop_config = MakeRefCounted<XdsApi::DropConfig>();
const envoy_api_v2_ClusterLoadAssignment_Policy* policy =
envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment);
if (policy != nullptr) {
@ -998,7 +891,7 @@ grpc_error* EdsResponsedParse(
} // namespace
grpc_error* XdsAdsResponseDecodeAndParse(
grpc_error* XdsApi::ParseAdsResponse(
const grpc_slice& encoded_response, const std::string& expected_server_name,
const std::string& expected_route_config_name,
const std::set<StringView>& expected_eds_service_names,
@ -1047,7 +940,7 @@ grpc_error* XdsAdsResponseDecodeAndParse(
namespace {
grpc_slice LrsRequestEncode(
grpc_slice SerializeLrsRequest(
const envoy_service_load_stats_v2_LoadStatsRequest* request,
upb_arena* arena) {
size_t output_length;
@ -1058,9 +951,7 @@ grpc_slice LrsRequestEncode(
} // namespace
grpc_slice XdsLrsRequestCreateAndEncode(const std::string& server_name,
const XdsBootstrap::Node* node,
const char* build_version) {
grpc_slice XdsApi::CreateLrsInitialRequest(const std::string& server_name) {
upb::Arena arena;
// Create a request.
envoy_service_load_stats_v2_LoadStatsRequest* request =
@ -1069,7 +960,7 @@ grpc_slice XdsLrsRequestCreateAndEncode(const std::string& server_name,
envoy_api_v2_core_Node* node_msg =
envoy_service_load_stats_v2_LoadStatsRequest_mutable_node(request,
arena.ptr());
PopulateNode(arena.ptr(), node, build_version, node_msg);
PopulateNode(arena.ptr(), node_, build_version_, node_msg);
// Add cluster stats. There is only one because we only use one server name in
// one channel.
envoy_api_v2_endpoint_ClusterStats* cluster_stats =
@ -1078,7 +969,7 @@ grpc_slice XdsLrsRequestCreateAndEncode(const std::string& server_name,
// Set the cluster name.
envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
cluster_stats, upb_strview_makez(server_name.c_str()));
return LrsRequestEncode(request, arena.ptr());
return SerializeLrsRequest(request, arena.ptr());
}
namespace {
@ -1123,7 +1014,7 @@ void LocalityStatsPopulate(
} // namespace
grpc_slice XdsLrsRequestCreateAndEncode(
grpc_slice XdsApi::CreateLrsRequest(
std::map<StringView, std::set<XdsClientStats*>, StringLess>
client_stats_map) {
upb::Arena arena;
@ -1193,12 +1084,12 @@ grpc_slice XdsLrsRequestCreateAndEncode(
timespec.tv_nsec);
}
}
return LrsRequestEncode(request, arena.ptr());
return SerializeLrsRequest(request, arena.ptr());
}
grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
std::set<std::string>* cluster_names,
grpc_millis* load_reporting_interval) {
grpc_error* XdsApi::ParseLrsResponse(const grpc_slice& encoded_response,
std::set<std::string>* cluster_names,
grpc_millis* load_reporting_interval) {
upb::Arena arena;
// Decode the response.
const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response =

@ -34,215 +34,218 @@
namespace grpc_core {
constexpr char kLdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.Listener";
constexpr char kRdsTypeUrl[] =
"type.googleapis.com/envoy.api.v2.RouteConfiguration";
constexpr char kCdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster";
constexpr char kEdsTypeUrl[] =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
struct RdsUpdate {
// The name to use in the CDS request.
std::string cluster_name;
};
struct LdsUpdate {
// The name to use in the RDS request.
std::string route_config_name;
// The name to use in the CDS request. Present if the LDS response has it
// inlined.
Optional<RdsUpdate> rds_update;
};
class XdsApi {
public:
static const char* kLdsTypeUrl;
static const char* kRdsTypeUrl;
static const char* kCdsTypeUrl;
static const char* kEdsTypeUrl;
struct RdsUpdate {
// The name to use in the CDS request.
std::string cluster_name;
};
using LdsUpdateMap = std::map<std::string /*server_name*/, LdsUpdate>;
struct LdsUpdate {
// The name to use in the RDS request.
std::string route_config_name;
// The name to use in the CDS request. Present if the LDS response has it
// inlined.
Optional<RdsUpdate> rds_update;
};
using RdsUpdateMap = std::map<std::string /*route_config_name*/, RdsUpdate>;
using LdsUpdateMap = std::map<std::string /*server_name*/, LdsUpdate>;
struct CdsUpdate {
// The name to use in the EDS request.
// If empty, the cluster name will be used.
std::string eds_service_name;
// The LRS server to use for load reporting.
// If not set, load reporting will be disabled.
// If set to the empty string, will use the same server we obtained the CDS
// data from.
Optional<std::string> lrs_load_reporting_server_name;
};
using RdsUpdateMap = std::map<std::string /*route_config_name*/, RdsUpdate>;
using CdsUpdateMap = std::map<std::string /*cluster_name*/, CdsUpdate>;
struct CdsUpdate {
// The name to use in the EDS request.
// If empty, the cluster name will be used.
std::string eds_service_name;
// The LRS server to use for load reporting.
// If not set, load reporting will be disabled.
// If set to the empty string, will use the same server we obtained the CDS
// data from.
Optional<std::string> lrs_load_reporting_server_name;
};
class XdsPriorityListUpdate {
public:
struct LocalityMap {
struct Locality {
bool operator==(const Locality& other) const {
return *name == *other.name && serverlist == other.serverlist &&
lb_weight == other.lb_weight && priority == other.priority;
}
using CdsUpdateMap = std::map<std::string /*cluster_name*/, CdsUpdate>;
// This comparator only compares the locality names.
struct Less {
bool operator()(const Locality& lhs, const Locality& rhs) const {
return XdsLocalityName::Less()(lhs.name, rhs.name);
class PriorityListUpdate {
public:
struct LocalityMap {
struct Locality {
bool operator==(const Locality& other) const {
return *name == *other.name && serverlist == other.serverlist &&
lb_weight == other.lb_weight && priority == other.priority;
}
// This comparator only compares the locality names.
struct Less {
bool operator()(const Locality& lhs, const Locality& rhs) const {
return XdsLocalityName::Less()(lhs.name, rhs.name);
}
};
RefCountedPtr<XdsLocalityName> name;
ServerAddressList serverlist;
uint32_t lb_weight;
uint32_t priority;
};
RefCountedPtr<XdsLocalityName> name;
ServerAddressList serverlist;
uint32_t lb_weight;
uint32_t priority;
bool Contains(const RefCountedPtr<XdsLocalityName>& name) const {
return localities.find(name) != localities.end();
}
size_t size() const { return localities.size(); }
std::map<RefCountedPtr<XdsLocalityName>, Locality, XdsLocalityName::Less>
localities;
};
bool Contains(const RefCountedPtr<XdsLocalityName>& name) const {
return localities.find(name) != localities.end();
bool operator==(const PriorityListUpdate& other) const;
bool operator!=(const PriorityListUpdate& other) const {
return !(*this == other);
}
size_t size() const { return localities.size(); }
void Add(LocalityMap::Locality locality);
std::map<RefCountedPtr<XdsLocalityName>, Locality, XdsLocalityName::Less>
localities;
};
const LocalityMap* Find(uint32_t priority) const;
bool operator==(const XdsPriorityListUpdate& other) const;
bool operator!=(const XdsPriorityListUpdate& other) const {
return !(*this == other);
}
bool Contains(uint32_t priority) const {
return priority < priorities_.size();
}
bool Contains(const RefCountedPtr<XdsLocalityName>& name);
void Add(LocalityMap::Locality locality);
bool empty() const { return priorities_.empty(); }
size_t size() const { return priorities_.size(); }
const LocalityMap* Find(uint32_t priority) const;
// Callers should make sure the priority list is non-empty.
uint32_t LowestPriority() const {
return static_cast<uint32_t>(priorities_.size()) - 1;
}
bool Contains(uint32_t priority) const {
return priority < priorities_.size();
}
bool Contains(const RefCountedPtr<XdsLocalityName>& name);
private:
InlinedVector<LocalityMap, 2> priorities_;
};
bool empty() const { return priorities_.empty(); }
size_t size() const { return priorities_.size(); }
// There are two phases of accessing this class's content:
// 1. to initialize in the control plane combiner;
// 2. to use in the data plane combiner.
// So no additional synchronization is needed.
class DropConfig : public RefCounted<DropConfig> {
public:
struct DropCategory {
bool operator==(const DropCategory& other) const {
return name == other.name &&
parts_per_million == other.parts_per_million;
}
// Callers should make sure the priority list is non-empty.
uint32_t LowestPriority() const {
return static_cast<uint32_t>(priorities_.size()) - 1;
}
std::string name;
const uint32_t parts_per_million;
};
private:
InlinedVector<LocalityMap, 2> priorities_;
};
using DropCategoryList = InlinedVector<DropCategory, 2>;
// There are two phases of accessing this class's content:
// 1. to initialize in the control plane combiner;
// 2. to use in the data plane combiner.
// So no additional synchronization is needed.
class XdsDropConfig : public RefCounted<XdsDropConfig> {
public:
struct DropCategory {
bool operator==(const DropCategory& other) const {
return name == other.name && parts_per_million == other.parts_per_million;
void AddCategory(std::string name, uint32_t parts_per_million) {
drop_category_list_.emplace_back(
DropCategory{std::move(name), parts_per_million});
}
std::string name;
const uint32_t parts_per_million;
};
// The only method invoked from the data plane combiner.
bool ShouldDrop(const std::string** category_name) const;
using DropCategoryList = InlinedVector<DropCategory, 2>;
const DropCategoryList& drop_category_list() const {
return drop_category_list_;
}
void AddCategory(std::string name, uint32_t parts_per_million) {
drop_category_list_.emplace_back(
DropCategory{std::move(name), parts_per_million});
}
bool operator==(const DropConfig& other) const {
return drop_category_list_ == other.drop_category_list_;
}
bool operator!=(const DropConfig& other) const { return !(*this == other); }
// The only method invoked from the data plane combiner.
bool ShouldDrop(const std::string** category_name) const;
private:
DropCategoryList drop_category_list_;
};
const DropCategoryList& drop_category_list() const {
return drop_category_list_;
}
struct EdsUpdate {
PriorityListUpdate priority_list_update;
RefCountedPtr<DropConfig> drop_config;
bool drop_all = false;
};
bool operator==(const XdsDropConfig& other) const {
return drop_category_list_ == other.drop_category_list_;
}
bool operator!=(const XdsDropConfig& other) const {
return !(*this == other);
}
using EdsUpdateMap = std::map<std::string /*eds_service_name*/, EdsUpdate>;
XdsApi(const XdsBootstrap::Node* node, const char* build_version)
: node_(node), build_version_(build_version) {}
// Creates a request to nack an unsupported resource type.
// Takes ownership of \a error.
grpc_slice CreateUnsupportedTypeNackRequest(const std::string& type_url,
const std::string& nonce,
grpc_error* error);
// Creates an LDS request querying \a server_name.
// Takes ownership of \a error.
grpc_slice CreateLdsRequest(const std::string& server_name,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node);
// Creates an RDS request querying \a route_config_name.
// Takes ownership of \a error.
grpc_slice CreateRdsRequest(const std::string& route_config_name,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node);
// Creates a CDS request querying \a cluster_names.
// Takes ownership of \a error.
grpc_slice CreateCdsRequest(const std::set<StringView>& cluster_names,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node);
// Creates an EDS request querying \a eds_service_names.
// Takes ownership of \a error.
grpc_slice CreateEdsRequest(const std::set<StringView>& eds_service_names,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node);
// Parses the ADS response and outputs the validated update for either CDS or
// EDS. If the response can't be parsed at the top level, \a type_url will
// point to an empty string; otherwise, it will point to the received data.
grpc_error* ParseAdsResponse(
const grpc_slice& encoded_response,
const std::string& expected_server_name,
const std::string& expected_route_config_name,
const std::set<StringView>& expected_eds_service_names,
LdsUpdate* lds_update, RdsUpdate* rds_update,
CdsUpdateMap* cds_update_map, EdsUpdateMap* eds_update_map,
std::string* version, std::string* nonce, std::string* type_url);
// Creates an LRS request querying \a server_name.
grpc_slice CreateLrsInitialRequest(const std::string& server_name);
// Creates an LRS request sending client-side load reports. If all the
// counters are zero, returns empty slice.
grpc_slice CreateLrsRequest(std::map<StringView /*cluster_name*/,
std::set<XdsClientStats*>, StringLess>
client_stats_map);
// Parses the LRS response and returns \a
// load_reporting_interval for client-side load reporting. If there is any
// error, the output config is invalid.
grpc_error* ParseLrsResponse(const grpc_slice& encoded_response,
std::set<std::string>* cluster_names,
grpc_millis* load_reporting_interval);
private:
DropCategoryList drop_category_list_;
const XdsBootstrap::Node* node_;
const char* build_version_;
};
struct EdsUpdate {
XdsPriorityListUpdate priority_list_update;
RefCountedPtr<XdsDropConfig> drop_config;
bool drop_all = false;
};
using EdsUpdateMap = std::map<std::string /*eds_service_name*/, EdsUpdate>;
// Creates a request to nack an unsupported resource type.
// Takes ownership of \a error.
grpc_slice XdsUnsupportedTypeNackRequestCreateAndEncode(
const std::string& type_url, const std::string& nonce, grpc_error* error);
// Creates an LDS request querying \a server_name.
// Takes ownership of \a error.
grpc_slice XdsLdsRequestCreateAndEncode(const std::string& server_name,
const XdsBootstrap::Node* node,
const char* build_version,
const std::string& version,
const std::string& nonce,
grpc_error* error);
// Creates an RDS request querying \a route_config_name.
// Takes ownership of \a error.
grpc_slice XdsRdsRequestCreateAndEncode(const std::string& route_config_name,
const XdsBootstrap::Node* node,
const char* build_version,
const std::string& version,
const std::string& nonce,
grpc_error* error);
// Creates a CDS request querying \a cluster_names.
// Takes ownership of \a error.
grpc_slice XdsCdsRequestCreateAndEncode(
const std::set<StringView>& cluster_names, const XdsBootstrap::Node* node,
const char* build_version, const std::string& version,
const std::string& nonce, grpc_error* error);
// Creates an EDS request querying \a eds_service_names.
// Takes ownership of \a error.
grpc_slice XdsEdsRequestCreateAndEncode(
const std::set<StringView>& eds_service_names,
const XdsBootstrap::Node* node, const char* build_version,
const std::string& version, const std::string& nonce, grpc_error* error);
// Parses the ADS response and outputs the validated update for either CDS or
// EDS. If the response can't be parsed at the top level, \a type_url will point
// to an empty string; otherwise, it will point to the received data.
grpc_error* XdsAdsResponseDecodeAndParse(
const grpc_slice& encoded_response, const std::string& expected_server_name,
const std::string& expected_route_config_name,
const std::set<StringView>& expected_eds_service_names,
LdsUpdate* lds_update, RdsUpdate* rds_update, CdsUpdateMap* cds_update_map,
EdsUpdateMap* eds_update_map, std::string* version, std::string* nonce,
std::string* type_url);
// Creates an LRS request querying \a server_name.
grpc_slice XdsLrsRequestCreateAndEncode(const std::string& server_name,
const XdsBootstrap::Node* node,
const char* build_version);
// Creates an LRS request sending client-side load reports. If all the counters
// are zero, returns empty slice.
grpc_slice XdsLrsRequestCreateAndEncode(
std::map<StringView /*cluster_name*/, std::set<XdsClientStats*>, StringLess>
client_stats_map);
// Parses the LRS response and returns \a
// load_reporting_interval for client-side load reporting. If there is any
// error, the output config is invalid.
grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
std::set<std::string>* cluster_names,
grpc_millis* load_reporting_interval);
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_XDS_XDS_API_H */

@ -187,17 +187,18 @@ class XdsClient::ChannelState::AdsCallState
gpr_log(GPR_INFO, "[xds_client %p] %s",
self->ads_calld_->xds_client(), grpc_error_string(error));
}
if (self->type_url_ == kLdsTypeUrl || self->type_url_ == kRdsTypeUrl) {
if (self->type_url_ == XdsApi::kLdsTypeUrl ||
self->type_url_ == XdsApi::kRdsTypeUrl) {
self->ads_calld_->xds_client()->service_config_watcher_->OnError(
error);
} else if (self->type_url_ == kCdsTypeUrl) {
} else if (self->type_url_ == XdsApi::kCdsTypeUrl) {
ClusterState& state =
self->ads_calld_->xds_client()->cluster_map_[self->name_];
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
} else if (self->type_url_ == kEdsTypeUrl) {
} else if (self->type_url_ == XdsApi::kEdsTypeUrl) {
EndpointState& state =
self->ads_calld_->xds_client()->endpoint_map_[self->name_];
for (const auto& p : state.watchers) {
@ -237,10 +238,10 @@ class XdsClient::ChannelState::AdsCallState
void SendMessageLocked(const std::string& type_url);
void AcceptLdsUpdate(LdsUpdate lds_update);
void AcceptRdsUpdate(RdsUpdate rds_update);
void AcceptCdsUpdate(CdsUpdateMap cds_update_map);
void AcceptEdsUpdate(EdsUpdateMap eds_update_map);
void AcceptLdsUpdate(XdsApi::LdsUpdate lds_update);
void AcceptRdsUpdate(XdsApi::RdsUpdate rds_update);
void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
static void OnRequestSent(void* arg, grpc_error* error);
static void OnRequestSentLocked(void* arg, grpc_error* error);
@ -710,13 +711,13 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
grpc_schedule_on_exec_ctx);
if (xds_client()->service_config_watcher_ != nullptr) {
Subscribe(kLdsTypeUrl, xds_client()->server_name_);
Subscribe(XdsApi::kLdsTypeUrl, xds_client()->server_name_);
}
for (const auto& p : xds_client()->cluster_map_) {
Subscribe(kCdsTypeUrl, std::string(p.first));
Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
}
for (const auto& p : xds_client()->endpoint_map_) {
Subscribe(kEdsTypeUrl, std::string(p.first));
Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
}
// Op: recv initial metadata.
op = ops;
@ -789,35 +790,31 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
auto& state = state_map_[type_url];
grpc_error* error = state.error;
state.error = GRPC_ERROR_NONE;
const XdsBootstrap::Node* node =
sent_initial_message_ ? nullptr : xds_client()->bootstrap_->node();
const char* build_version =
sent_initial_message_ ? nullptr : xds_client()->build_version_.get();
sent_initial_message_ = true;
grpc_slice request_payload_slice;
if (type_url == kLdsTypeUrl) {
request_payload_slice = XdsLdsRequestCreateAndEncode(
xds_client()->server_name_, node, build_version, state.version,
state.nonce, error);
if (type_url == XdsApi::kLdsTypeUrl) {
request_payload_slice = xds_client()->api_.CreateLdsRequest(
xds_client()->server_name_, state.version, state.nonce, error,
!sent_initial_message_);
state.subscribed_resources[xds_client()->server_name_]->Start(Ref());
} else if (type_url == kRdsTypeUrl) {
request_payload_slice = XdsRdsRequestCreateAndEncode(
xds_client()->route_config_name_, node, build_version, state.version,
state.nonce, error);
} else if (type_url == XdsApi::kRdsTypeUrl) {
request_payload_slice = xds_client()->api_.CreateRdsRequest(
xds_client()->route_config_name_, state.version, state.nonce, error,
!sent_initial_message_);
state.subscribed_resources[xds_client()->route_config_name_]->Start(Ref());
} else if (type_url == kCdsTypeUrl) {
request_payload_slice = XdsCdsRequestCreateAndEncode(
ClusterNamesForRequest(), node, build_version, state.version,
state.nonce, error);
} else if (type_url == kEdsTypeUrl) {
request_payload_slice = XdsEdsRequestCreateAndEncode(
EdsServiceNamesForRequest(), node, build_version, state.version,
state.nonce, error);
} else if (type_url == XdsApi::kCdsTypeUrl) {
request_payload_slice = xds_client()->api_.CreateCdsRequest(
ClusterNamesForRequest(), state.version, state.nonce, error,
!sent_initial_message_);
} else if (type_url == XdsApi::kEdsTypeUrl) {
request_payload_slice = xds_client()->api_.CreateEdsRequest(
EdsServiceNamesForRequest(), state.version, state.nonce, error,
!sent_initial_message_);
} else {
request_payload_slice = XdsUnsupportedTypeNackRequestCreateAndEncode(
request_payload_slice = xds_client()->api_.CreateUnsupportedTypeNackRequest(
type_url, state.nonce, state.error);
state_map_.erase(type_url);
}
sent_initial_message_ = true;
// Create message payload.
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
@ -863,7 +860,7 @@ bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
}
void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
LdsUpdate lds_update) {
XdsApi::LdsUpdate lds_update) {
const std::string& cluster_name =
lds_update.rds_update.has_value()
? lds_update.rds_update.value().cluster_name
@ -876,7 +873,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
xds_client(), lds_update.route_config_name.c_str(),
cluster_name.c_str());
}
auto& lds_state = state_map_[kLdsTypeUrl];
auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
if (state != nullptr) state->Finish();
// Ignore identical update.
@ -906,19 +903,19 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
}
} else {
// Send RDS request for dynamic resolution.
Subscribe(kRdsTypeUrl, xds_client()->route_config_name_);
Subscribe(XdsApi::kRdsTypeUrl, xds_client()->route_config_name_);
}
}
void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
RdsUpdate rds_update) {
XdsApi::RdsUpdate rds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] RDS update received: "
"cluster_name=%s",
xds_client(), rds_update.cluster_name.c_str());
}
auto& rds_state = state_map_[kRdsTypeUrl];
auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
auto& state =
rds_state.subscribed_resources[xds_client()->route_config_name_];
if (state != nullptr) state->Finish();
@ -945,11 +942,11 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
}
void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
CdsUpdateMap cds_update_map) {
auto& cds_state = state_map_[kCdsTypeUrl];
XdsApi::CdsUpdateMap cds_update_map) {
auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
for (auto& p : cds_update_map) {
const char* cluster_name = p.first.c_str();
CdsUpdate& cds_update = p.second;
XdsApi::CdsUpdate& cds_update = p.second;
auto& state = cds_state.subscribed_resources[cluster_name];
if (state != nullptr) state->Finish();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
@ -987,11 +984,11 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
}
void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
EdsUpdateMap eds_update_map) {
auto& eds_state = state_map_[kEdsTypeUrl];
XdsApi::EdsUpdateMap eds_update_map) {
auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
for (auto& p : eds_update_map) {
const char* eds_service_name = p.first.c_str();
EdsUpdate& eds_update = p.second;
XdsApi::EdsUpdate& eds_update = p.second;
auto& state = eds_state.subscribed_resources[eds_service_name];
if (state != nullptr) state->Finish();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
@ -1035,7 +1032,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
}
for (size_t i = 0;
i < eds_update.drop_config->drop_category_list().size(); ++i) {
const XdsDropConfig::DropCategory& drop_category =
const XdsApi::DropConfig::DropCategory& drop_category =
eds_update.drop_config->drop_category_list()[i];
gpr_log(GPR_INFO,
"[xds_client %p] Drop category %s has drop rate %d per million",
@ -1046,7 +1043,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
EndpointState& endpoint_state =
xds_client()->endpoint_map_[eds_service_name];
// Ignore identical update.
const EdsUpdate& prev_update = endpoint_state.update;
const XdsApi::EdsUpdate& prev_update = endpoint_state.update;
const bool priority_list_changed =
prev_update.priority_list_update != eds_update.priority_list_update;
const bool drop_config_changed =
@ -1138,15 +1135,15 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
// mode. We will also need to cancel the timer when we receive a serverlist
// from the balancer.
// Parse the response.
LdsUpdate lds_update;
RdsUpdate rds_update;
CdsUpdateMap cds_update_map;
EdsUpdateMap eds_update_map;
XdsApi::LdsUpdate lds_update;
XdsApi::RdsUpdate rds_update;
XdsApi::CdsUpdateMap cds_update_map;
XdsApi::EdsUpdateMap eds_update_map;
std::string version;
std::string nonce;
std::string type_url;
// Note that XdsAdsResponseDecodeAndParse() also validate the response.
grpc_error* parse_error = XdsAdsResponseDecodeAndParse(
// Note that ParseAdsResponse() also validates the response.
grpc_error* parse_error = xds_client->api_.ParseAdsResponse(
response_slice, xds_client->server_name_, xds_client->route_config_name_,
ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update,
&cds_update_map, &eds_update_map, &version, &nonce, &type_url);
@ -1173,13 +1170,13 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
} else {
ads_calld->seen_response_ = true;
// Accept the ADS response according to the type_url.
if (type_url == kLdsTypeUrl) {
if (type_url == XdsApi::kLdsTypeUrl) {
ads_calld->AcceptLdsUpdate(std::move(lds_update));
} else if (type_url == kRdsTypeUrl) {
} else if (type_url == XdsApi::kRdsTypeUrl) {
ads_calld->AcceptRdsUpdate(std::move(rds_update));
} else if (type_url == kCdsTypeUrl) {
} else if (type_url == XdsApi::kCdsTypeUrl) {
ads_calld->AcceptCdsUpdate(std::move(cds_update_map));
} else if (type_url == kEdsTypeUrl) {
} else if (type_url == XdsApi::kEdsTypeUrl) {
ads_calld->AcceptEdsUpdate(std::move(eds_update_map));
}
state.version = std::move(version);
@ -1258,7 +1255,7 @@ bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
std::set<StringView>
XdsClient::ChannelState::AdsCallState::ClusterNamesForRequest() {
std::set<StringView> cluster_names;
for (auto& p : state_map_[kCdsTypeUrl].subscribed_resources) {
for (auto& p : state_map_[XdsApi::kCdsTypeUrl].subscribed_resources) {
cluster_names.insert(p.first);
OrphanablePtr<ResourceState>& state = p.second;
state->Start(Ref());
@ -1269,7 +1266,7 @@ XdsClient::ChannelState::AdsCallState::ClusterNamesForRequest() {
std::set<StringView>
XdsClient::ChannelState::AdsCallState::EdsServiceNamesForRequest() {
std::set<StringView> eds_names;
for (auto& p : state_map_[kEdsTypeUrl].subscribed_resources) {
for (auto& p : state_map_[XdsApi::kEdsTypeUrl].subscribed_resources) {
eds_names.insert(p.first);
OrphanablePtr<ResourceState>& state = p.second;
state->Start(Ref());
@ -1320,7 +1317,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
// Create a request that contains the load report.
grpc_slice request_payload_slice =
XdsLrsRequestCreateAndEncode(xds_client()->ClientStatsMap());
xds_client()->api_.CreateLrsRequest(xds_client()->ClientStatsMap());
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
const bool old_val = last_report_counters_were_zero_;
@ -1396,9 +1393,8 @@ XdsClient::ChannelState::LrsCallState::LrsCallState(
nullptr, GRPC_MILLIS_INF_FUTURE, nullptr);
GPR_ASSERT(call_ != nullptr);
// Init the request payload.
grpc_slice request_payload_slice = XdsLrsRequestCreateAndEncode(
xds_client()->server_name_, xds_client()->bootstrap_->node(),
xds_client()->build_version_.get());
grpc_slice request_payload_slice =
xds_client()->api_.CreateLrsInitialRequest(xds_client()->server_name_);
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
@ -1577,7 +1573,7 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked(
// Parse the response.
std::set<std::string> new_cluster_names;
grpc_millis new_load_reporting_interval;
grpc_error* parse_error = XdsLrsResponseDecodeAndParse(
grpc_error* parse_error = xds_client->api_.ParseLrsResponse(
response_slice, &new_cluster_names, &new_load_reporting_interval);
if (parse_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
@ -1722,6 +1718,8 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
combiner_(GRPC_COMBINER_REF(combiner, "xds_client")),
interested_parties_(interested_parties),
bootstrap_(XdsBootstrap::ReadFromFile(error)),
api_(bootstrap_ == nullptr ? nullptr : bootstrap_->node(),
build_version_.get()),
server_name_(server_name),
service_config_watcher_(std::move(watcher)) {
if (*error != GRPC_ERROR_NONE) {
@ -1744,7 +1742,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
chand_ = MakeOrphanable<ChannelState>(
Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
if (service_config_watcher_ != nullptr) {
chand_->Subscribe(kLdsTypeUrl, std::string(server_name));
chand_->Subscribe(XdsApi::kLdsTypeUrl, std::string(server_name));
}
}
@ -1769,7 +1767,7 @@ void XdsClient::WatchClusterData(
if (cluster_state.update.has_value()) {
w->OnClusterChanged(cluster_state.update.value());
}
chand_->Subscribe(kCdsTypeUrl, cluster_name_str);
chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
}
void XdsClient::CancelClusterDataWatch(StringView cluster_name,
@ -1782,7 +1780,7 @@ void XdsClient::CancelClusterDataWatch(StringView cluster_name,
cluster_state.watchers.erase(it);
if (cluster_state.watchers.empty()) {
cluster_map_.erase(cluster_name_str);
chand_->Unsubscribe(kCdsTypeUrl, cluster_name_str);
chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
}
}
}
@ -1799,7 +1797,7 @@ void XdsClient::WatchEndpointData(
if (!endpoint_state.update.priority_list_update.empty()) {
w->OnEndpointChanged(endpoint_state.update);
}
chand_->Subscribe(kEdsTypeUrl, eds_service_name_str);
chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
}
void XdsClient::CancelEndpointDataWatch(StringView eds_service_name,
@ -1812,7 +1810,7 @@ void XdsClient::CancelEndpointDataWatch(StringView eds_service_name,
endpoint_state.watchers.erase(it);
if (endpoint_state.watchers.empty()) {
endpoint_map_.erase(eds_service_name_str);
chand_->Unsubscribe(kEdsTypeUrl, eds_service_name_str);
chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
}
}
}

@ -56,7 +56,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
public:
virtual ~ClusterWatcherInterface() = default;
virtual void OnClusterChanged(CdsUpdate cluster_data) = 0;
virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
virtual void OnError(grpc_error* error) = 0;
};
@ -66,7 +66,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
public:
virtual ~EndpointWatcherInterface() = default;
virtual void OnEndpointChanged(EdsUpdate update) = 0;
virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
virtual void OnError(grpc_error* error) = 0;
};
@ -175,7 +175,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
watchers;
// The latest data seen from CDS.
Optional<CdsUpdate> update;
Optional<XdsApi::CdsUpdate> update;
};
struct EndpointState {
@ -184,7 +184,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
watchers;
std::set<XdsClientStats*> client_stats;
// The latest data seen from EDS.
EdsUpdate update;
XdsApi::EdsUpdate update;
};
// Sends an error notification to all watchers.
@ -212,6 +212,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
grpc_pollset_set* interested_parties_;
std::unique_ptr<XdsBootstrap> bootstrap_;
XdsApi api_;
const std::string server_name_;

Loading…
Cancel
Save