xds: handle percent-encoding in new-style resource names (#28515)

reviewable/pr28504/r2
Mark D. Roth 3 years ago committed by GitHub
parent d61b564ac0
commit 5c30de312b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 210
      src/core/ext/xds/xds_client.cc
  2. 19
      src/core/ext/xds/xds_client.h
  3. 19
      src/core/ext/xds/xds_server_config_fetcher.cc
  4. 47
      test/cpp/end2end/xds/xds_end2end_test.cc

@ -191,7 +191,7 @@ class XdsClient::ChannelState::AdsCallState
std::string version; std::string version;
std::string nonce; std::string nonce;
std::vector<std::string> errors; std::vector<std::string> errors;
std::map<std::string /*authority*/, std::set<std::string /*id*/>> std::map<std::string /*authority*/, std::set<XdsResourceKey>>
resources_seen; resources_seen;
bool have_valid_resources = false; bool have_valid_resources = false;
}; };
@ -270,16 +270,18 @@ class XdsClient::ChannelState::AdsCallState
"timeout obtaining resource {type=%s name=%s} from xds server", "timeout obtaining resource {type=%s name=%s} from xds server",
type_->type_url(), type_->type_url(),
XdsClient::ConstructFullXdsResourceName( XdsClient::ConstructFullXdsResourceName(
name_.authority, type_->type_url(), name_.id))); name_.authority, type_->type_url(), name_.key)));
watcher_error = grpc_error_set_int( watcher_error = grpc_error_set_int(
watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(), gpr_log(GPR_INFO, "[xds_client %p] xds server %s: %s",
ads_calld_->xds_client(),
ads_calld_->chand()->server_.server_uri.c_str(),
grpc_error_std_string(watcher_error).c_str()); grpc_error_std_string(watcher_error).c_str());
} }
auto& authority_state = auto& authority_state =
ads_calld_->xds_client()->authority_state_map_[name_.authority]; ads_calld_->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.id]; ResourceState& state = authority_state.resource_map[type_][name_.key];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
ads_calld_->xds_client(), state.watchers, watcher_error, ads_calld_->xds_client(), state.watchers, watcher_error,
@ -307,7 +309,7 @@ class XdsClient::ChannelState::AdsCallState
// Subscribed resources of this type. // Subscribed resources of this type.
std::map<std::string /*authority*/, std::map<std::string /*authority*/,
std::map<std::string /*name*/, OrphanablePtr<ResourceTimer>>> std::map<XdsResourceKey, OrphanablePtr<ResourceTimer>>>
subscribed_resources; subscribed_resources;
}; };
@ -485,9 +487,10 @@ class XdsClient::ChannelState::StateWatcher
new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// In TRANSIENT_FAILURE. Notify all watchers of error. // In TRANSIENT_FAILURE. Notify all watchers of error.
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xds_client %p] xds channel in state:TRANSIENT_FAILURE " "[xds_client %p] xds channel for server %s in "
"status_message:(%s)", "state TRANSIENT_FAILURE: %s",
parent_->xds_client(), status.ToString().c_str()); parent_->xds_client(), parent_->server_.server_uri.c_str(),
status.ToString().c_str());
parent_->xds_client_->NotifyOnErrorLocked( parent_->xds_client_->NotifyOnErrorLocked(
GRPC_ERROR_CREATE_FROM_STATIC_STRING( GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds channel in TRANSIENT_FAILURE")); "xds channel in TRANSIENT_FAILURE"));
@ -535,8 +538,8 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
XdsClient::ChannelState::~ChannelState() { XdsClient::ChannelState::~ChannelState() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(), gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s",
this); xds_client(), this, server_.server_uri.c_str());
} }
grpc_channel_destroy(channel_); grpc_channel_destroy(channel_);
xds_client_.reset(DEBUG_LOCATION, "ChannelState"); xds_client_.reset(DEBUG_LOCATION, "ChannelState");
@ -676,10 +679,10 @@ void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
GPR_ASSERT(chand_->channel_ != nullptr); GPR_ASSERT(chand_->channel_ != nullptr);
GPR_ASSERT(calld_ == nullptr); GPR_ASSERT(calld_ == nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, gpr_log(
"[xds_client %p] Start new call from retryable call (chand: %p, " GPR_INFO,
"retryable call: %p)", "[xds_client %p] xds server %s: start new call from retryable call %p",
chand()->xds_client(), chand(), this); chand()->xds_client(), chand()->server_.server_uri.c_str(), this);
} }
calld_ = MakeOrphanable<T>( calld_ = MakeOrphanable<T>(
this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call")); this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
@ -693,9 +696,10 @@ void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
grpc_millis timeout = grpc_millis timeout =
std::max(next_attempt_time - ExecCtx::Get()->Now(), grpc_millis(0)); std::max(next_attempt_time - ExecCtx::Get()->Now(), grpc_millis(0));
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xds_client %p] Failed to connect to xds server (chand: %p) " "[xds_client %p] xds server %s: call attempt failed; "
"retry timer will fire in %" PRId64 "ms.", "retry timer will fire in %" PRId64 "ms.",
chand()->xds_client(), chand(), timeout); chand()->xds_client(), chand()->server_.server_uri.c_str(),
timeout);
} }
this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release(); this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_); grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
@ -719,10 +723,10 @@ void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
retry_timer_callback_pending_ = false; retry_timer_callback_pending_ = false;
if (!shutting_down_ && error == GRPC_ERROR_NONE) { if (!shutting_down_ && error == GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log( gpr_log(GPR_INFO,
GPR_INFO, "[xds_client %p] xds server %s: retry timer fired (retryable "
"[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)", "call: %p)",
chand()->xds_client(), chand(), this); chand()->xds_client(), chand()->server_.server_uri.c_str(), this);
} }
StartNewCallLocked(); StartNewCallLocked();
} }
@ -736,11 +740,14 @@ void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser:: absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser::
ProcessAdsResponseFields(AdsResponseFields fields) { ProcessAdsResponseFields(AdsResponseFields fields) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, gpr_log(
"[xds_client %p] received ADS response: type_url=%s, " GPR_INFO,
"version=%s, nonce=%s, num_resources=%" PRIuPTR, "[xds_client %p] xds server %s: received ADS response: type_url=%s, "
ads_call_state_->xds_client(), fields.type_url.c_str(), "version=%s, nonce=%s, num_resources=%" PRIuPTR,
fields.version.c_str(), fields.nonce.c_str(), fields.num_resources); ads_call_state_->xds_client(),
ads_call_state_->chand()->server_.server_uri.c_str(),
fields.type_url.c_str(), fields.version.c_str(), fields.nonce.c_str(),
fields.num_resources);
} }
result_.type = result_.type =
ads_call_state_->xds_client()->GetResourceTypeLocked(fields.type_url); ads_call_state_->xds_client()->GetResourceTypeLocked(fields.type_url);
@ -815,7 +822,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
auto it = auto it =
timer_it->second.subscribed_resources.find(resource_name->authority); timer_it->second.subscribed_resources.find(resource_name->authority);
if (it != timer_it->second.subscribed_resources.end()) { if (it != timer_it->second.subscribed_resources.end()) {
auto res_it = it->second.find(resource_name->id); auto res_it = it->second.find(resource_name->key);
if (res_it != it->second.end()) { if (res_it != it->second.end()) {
res_it->second->MaybeCancelTimer(); res_it->second->MaybeCancelTimer();
} }
@ -834,15 +841,15 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
return; // Skip resource -- we don't have a subscription for it. return; // Skip resource -- we don't have a subscription for it.
} }
auto& type_map = type_it->second; auto& type_map = type_it->second;
// Found type, so look up resource id. // Found type, so look up resource key.
auto it = type_map.find(resource_name->id); auto it = type_map.find(resource_name->key);
if (it == type_map.end()) { if (it == type_map.end()) {
return; // Skip resource -- we don't have a subscription for it. return; // Skip resource -- we don't have a subscription for it.
} }
ResourceState& resource_state = it->second; ResourceState& resource_state = it->second;
// If needed, record that we've seen this resource. // If needed, record that we've seen this resource.
if (result_.type->AllResourcesRequiredInSotW()) { if (result_.type->AllResourcesRequiredInSotW()) {
result_.resources_seen[resource_name->authority].insert(resource_name->id); result_.resources_seen[resource_name->authority].insert(resource_name->key);
} }
// Update resource state based on whether the resource is valid. // Update resource state based on whether the resource is valid.
if (!result->resource.ok()) { if (!result->resource.ok()) {
@ -927,9 +934,9 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
// Start the call. // Start the call.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xds_client %p] Starting ADS call (chand: %p, calld: %p, " "[xds_client %p] xds server %s: starting ADS call "
"call: %p)", "(calld: %p, call: %p)",
xds_client(), chand(), this, call_); xds_client(), chand()->server_.server_uri.c_str(), this, call_);
} }
// Create the ops. // Create the ops.
grpc_call_error call_error; grpc_call_error call_error;
@ -956,8 +963,8 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
for (const auto& t : a.second.resource_map) { for (const auto& t : a.second.resource_map) {
const XdsResourceType* type = t.first; const XdsResourceType* type = t.first;
for (const auto& r : t.second) { for (const auto& r : t.second) {
const std::string& resource_id = r.first; const XdsResourceKey& resource_key = r.first;
SubscribeLocked(type, {authority, resource_id}); SubscribeLocked(type, {authority, resource_key});
} }
} }
} }
@ -1041,9 +1048,10 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
sent_initial_message_ = true; sent_initial_message_ = true;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xds_client %p] sending ADS request: type=%s version=%s nonce=%s " "[xds_client %p] xds server %s: sending ADS request: type=%s "
"error=%s", "version=%s nonce=%s error=%s",
xds_client(), std::string(type->type_url()).c_str(), xds_client(), chand()->server_.server_uri.c_str(),
std::string(type->type_url()).c_str(),
chand()->resource_type_version_map_[type].c_str(), chand()->resource_type_version_map_[type].c_str(),
state.nonce.c_str(), grpc_error_std_string(state.error).c_str()); state.nonce.c_str(), grpc_error_std_string(state.error).c_str());
} }
@ -1065,15 +1073,17 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_); grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"[xds_client %p] calld=%p call_error=%d sending ADS message", "[xds_client %p] xds server %s: error starting ADS send_message "
xds_client(), this, call_error); "batch on calld=%p: call_error=%d",
xds_client(), chand()->server_.server_uri.c_str(), this,
call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error); GPR_ASSERT(GRPC_CALL_OK == call_error);
} }
} }
void XdsClient::ChannelState::AdsCallState::SubscribeLocked( void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
const XdsResourceType* type, const XdsResourceName& name) { const XdsResourceType* type, const XdsResourceName& name) {
auto& state = state_map_[type].subscribed_resources[name.authority][name.id]; auto& state = state_map_[type].subscribed_resources[name.authority][name.key];
if (state == nullptr) { if (state == nullptr) {
state = MakeOrphanable<ResourceTimer>(type, name); state = MakeOrphanable<ResourceTimer>(type, name);
SendMessageLocked(type); SendMessageLocked(type);
@ -1085,7 +1095,7 @@ void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked(
bool delay_unsubscription) { bool delay_unsubscription) {
auto& type_state_map = state_map_[type]; auto& type_state_map = state_map_[type];
auto& authority_map = type_state_map.subscribed_resources[name.authority]; auto& authority_map = type_state_map.subscribed_resources[name.authority];
authority_map.erase(name.id); authority_map.erase(name.key);
if (authority_map.empty()) { if (authority_map.empty()) {
type_state_map.subscribed_resources.erase(name.authority); type_state_map.subscribed_resources.erase(name.authority);
} }
@ -1165,8 +1175,10 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
if (!status.ok()) { if (!status.ok()) {
// Ignore unparsable response. // Ignore unparsable response.
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"[xds_client %p] Error parsing ADS response (%s) -- ignoring", "[xds_client %p] xds server %s: error parsing ADS response (%s) "
xds_client(), status.ToString().c_str()); "-- ignoring",
xds_client(), chand()->server_.server_uri.c_str(),
status.ToString().c_str());
} else { } else {
AdsResponseParser::Result result = parser.TakeResult(); AdsResponseParser::Result result = parser.TakeResult();
// Update nonce. // Update nonce.
@ -1175,11 +1187,13 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
// If we got an error, set state.error so that we'll NACK the update. // If we got an error, set state.error so that we'll NACK the update.
if (!result.errors.empty()) { if (!result.errors.empty()) {
std::string error = absl::StrJoin(result.errors, "; "); std::string error = absl::StrJoin(result.errors, "; ");
gpr_log(GPR_ERROR, gpr_log(
"[xds_client %p] ADS response invalid for resource type %s " GPR_ERROR,
"version %s, will NACK: nonce=%s error=%s", "[xds_client %p] xds server %s: ADS response invalid for resource "
xds_client(), result.type_url.c_str(), result.version.c_str(), "type %s version %s, will NACK: nonce=%s error=%s",
state.nonce.c_str(), error.c_str()); xds_client(), chand()->server_.server_uri.c_str(),
result.type_url.c_str(), result.version.c_str(), state.nonce.c_str(),
error.c_str());
GRPC_ERROR_UNREF(state.error); GRPC_ERROR_UNREF(state.error);
state.error = grpc_error_set_int( state.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_CPP_STRING(std::move(error)), GRPC_ERROR_CREATE_FROM_CPP_STRING(std::move(error)),
@ -1198,10 +1212,10 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
if (type_it == authority_state.resource_map.end()) continue; if (type_it == authority_state.resource_map.end()) continue;
// Iterate over resource ids. // Iterate over resource ids.
for (auto& r : type_it->second) { for (auto& r : type_it->second) {
const std::string& resource_id = r.first; const XdsResourceKey& resource_key = r.first;
ResourceState& resource_state = r.second; ResourceState& resource_state = r.second;
if (seen_authority_it == result.resources_seen.end() || if (seen_authority_it == result.resources_seen.end() ||
seen_authority_it->second.find(resource_id) == seen_authority_it->second.find(resource_key) ==
seen_authority_it->second.end()) { seen_authority_it->second.end()) {
// If the resource was newly requested but has not yet been // If the resource was newly requested but has not yet been
// received, we don't want to generate an error for the watchers, // received, we don't want to generate an error for the watchers,
@ -1266,9 +1280,11 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
char* status_details = grpc_slice_to_c_string(status_details_); char* status_details = grpc_slice_to_c_string(status_details_);
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xds_client %p] ADS call status received. Status = %d, details " "[xds_client %p] xds server %s: ADS call status received "
"= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'", "(chand=%p, ads_calld=%p, call=%p): "
xds_client(), status_code_, status_details, chand(), this, call_, "status=%d, details='%s', error='%s'",
xds_client(), chand()->server_.server_uri.c_str(), chand(), this,
call_, status_code_, status_details,
grpc_error_std_string(error).c_str()); grpc_error_std_string(error).c_str());
gpr_free(status_details); gpr_free(status_details);
} }
@ -1299,9 +1315,9 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
for (auto& a : it->second.subscribed_resources) { for (auto& a : it->second.subscribed_resources) {
const std::string& authority = a.first; const std::string& authority = a.first;
for (auto& p : a.second) { for (auto& p : a.second) {
const std::string& resource_id = p.first; const XdsResourceKey& resource_key = p.first;
resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName( resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName(
authority, type->type_url(), resource_id)); authority, type->type_url(), resource_key));
OrphanablePtr<ResourceTimer>& resource_timer = p.second; OrphanablePtr<ResourceTimer>& resource_timer = p.second;
resource_timer->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceTimer")); resource_timer->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceTimer"));
} }
@ -1397,8 +1413,10 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
parent_->call_, &op, 1, &on_report_done_); parent_->call_, &op, 1, &on_report_done_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"[xds_client %p] calld=%p call_error=%d sending client load report", "[xds_client %p] xds server %s: error starting LRS send_message "
xds_client(), this, call_error); "batch on calld=%p: call_error=%d",
xds_client(), parent_->chand()->server_.server_uri.c_str(), this,
call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error); GPR_ASSERT(GRPC_CALL_OK == call_error);
} }
return false; return false;
@ -1474,10 +1492,10 @@ XdsClient::ChannelState::LrsCallState::LrsCallState(
grpc_metadata_array_init(&trailing_metadata_recv_); grpc_metadata_array_init(&trailing_metadata_recv_);
// Start the call. // Start the call.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, gpr_log(
"[xds_client %p] Starting LRS call (chand: %p, calld: %p, " GPR_INFO,
"call: %p)", "[xds_client %p] xds server %s: starting LRS call (calld=%p, call=%p)",
xds_client(), chand(), this, call_); xds_client(), chand()->server_.server_uri.c_str(), this, call_);
} }
// Create the ops. // Create the ops.
grpc_call_error call_error; grpc_call_error call_error;
@ -1637,8 +1655,9 @@ bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
&new_load_reporting_interval); &new_load_reporting_interval);
if (parse_error != GRPC_ERROR_NONE) { if (parse_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"[xds_client %p] LRS response parsing failed. error=%s", "[xds_client %p] xds server %s: LRS response parsing failed: %s",
xds_client(), grpc_error_std_string(parse_error).c_str()); xds_client(), chand()->server_.server_uri.c_str(),
grpc_error_std_string(parse_error).c_str());
GRPC_ERROR_UNREF(parse_error); GRPC_ERROR_UNREF(parse_error);
return; return;
} }
@ -1646,10 +1665,11 @@ bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log( gpr_log(
GPR_INFO, GPR_INFO,
"[xds_client %p] LRS response received, %" PRIuPTR "[xds_client %p] xds server %s: LRS response received, %" PRIuPTR
" cluster names, send_all_clusters=%d, load_report_interval=%" PRId64 " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
"ms", "ms",
xds_client(), new_cluster_names.size(), send_all_clusters, xds_client(), chand()->server_.server_uri.c_str(),
new_cluster_names.size(), send_all_clusters,
new_load_reporting_interval); new_load_reporting_interval);
size_t i = 0; size_t i = 0;
for (const auto& name : new_cluster_names) { for (const auto& name : new_cluster_names) {
@ -1663,9 +1683,10 @@ bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS; GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xds_client %p] Increased load_report_interval to minimum " "[xds_client %p] xds server %s: increased load_report_interval "
"value %dms", "to minimum value %dms",
xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); xds_client(), chand()->server_.server_uri.c_str(),
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
} }
} }
// Ignore identical update. // Ignore identical update.
@ -1673,10 +1694,11 @@ bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
cluster_names_ == new_cluster_names && cluster_names_ == new_cluster_names &&
load_reporting_interval_ == new_load_reporting_interval) { load_reporting_interval_ == new_load_reporting_interval) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, gpr_log(
"[xds_client %p] Incoming LRS response identical to current, " GPR_INFO,
"ignoring.", "[xds_client %p] xds server %s: incoming LRS response identical "
xds_client()); "to current, ignoring.",
xds_client(), chand()->server_.server_uri.c_str());
} }
return; return;
} }
@ -1722,9 +1744,11 @@ void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
char* status_details = grpc_slice_to_c_string(status_details_); char* status_details = grpc_slice_to_c_string(status_details_);
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xds_client %p] LRS call status received. Status = %d, details " "[xds_client %p] xds server %s: LRS call status received "
"= '%s', (chand: %p, calld: %p, call: %p), error '%s'", "(chand=%p, calld=%p, call=%p): "
xds_client(), status_code_, status_details, chand(), this, call_, "status=%d, details='%s', error='%s'",
xds_client(), chand()->server_.server_uri.c_str(), chand(), this,
call_, status_code_, status_details,
grpc_error_std_string(error).c_str()); grpc_error_std_string(error).c_str());
gpr_free(status_details); gpr_free(status_details);
} }
@ -1879,7 +1903,7 @@ void XdsClient::WatchResource(const XdsResourceType* type,
AuthorityState& authority_state = AuthorityState& authority_state =
authority_state_map_[resource_name->authority]; authority_state_map_[resource_name->authority];
ResourceState& resource_state = ResourceState& resource_state =
authority_state.resource_map[type][resource_name->id]; authority_state.resource_map[type][resource_name->key];
resource_state.watchers[w] = watcher; resource_state.watchers[w] = watcher;
// If we already have a cached value for the resource, notify the new // If we already have a cached value for the resource, notify the new
// watcher immediately. // watcher immediately.
@ -1927,8 +1951,8 @@ void XdsClient::CancelResourceWatch(const XdsResourceType* type,
auto type_it = authority_state.resource_map.find(type); auto type_it = authority_state.resource_map.find(type);
if (type_it == authority_state.resource_map.end()) return; if (type_it == authority_state.resource_map.end()) return;
auto& type_map = type_it->second; auto& type_map = type_it->second;
// Find resource id. // Find resource key.
auto resource_it = type_map.find(resource_name->id); auto resource_it = type_map.find(resource_name->key);
if (resource_it == type_map.end()) return; if (resource_it == type_map.end()) return;
ResourceState& resource_state = resource_it->second; ResourceState& resource_state = resource_it->second;
// Remove watcher. // Remove watcher.
@ -1973,7 +1997,7 @@ absl::StatusOr<XdsClient::XdsResourceName> XdsClient::ParseXdsResourceName(
// Old-style names use the empty string for authority. // Old-style names use the empty string for authority.
// authority is prefixed with "old:" to indicate that it's an old-style name. // authority is prefixed with "old:" to indicate that it's an old-style name.
if (!absl::StartsWith(name, "xdstp:")) { if (!absl::StartsWith(name, "xdstp:")) {
return XdsResourceName{"old:", std::string(name)}; return XdsResourceName{"old:", {std::string(name), {}}};
} }
// New style name. Parse URI. // New style name. Parse URI.
auto uri = URI::Parse(name); auto uri = URI::Parse(name);
@ -1985,23 +2009,29 @@ absl::StatusOr<XdsClient::XdsResourceName> XdsClient::ParseXdsResourceName(
return absl::InvalidArgumentError( return absl::InvalidArgumentError(
"xdstp URI path must indicate valid xDS resource type"); "xdstp URI path must indicate valid xDS resource type");
} }
std::vector<std::pair<absl::string_view, absl::string_view>> query_parameters( // Canonicalize order of query params.
uri->query_parameter_map().begin(), uri->query_parameter_map().end()); std::vector<URI::QueryParam> query_params;
std::sort(query_parameters.begin(), query_parameters.end()); for (const auto& p : uri->query_parameter_map()) {
query_params.emplace_back(
URI::QueryParam{std::string(p.first), std::string(p.second)});
}
return XdsResourceName{ return XdsResourceName{
absl::StrCat("xdstp:", uri->authority()), absl::StrCat("xdstp:", uri->authority()),
absl::StrCat( {std::string(path_parts.second), std::move(query_params)}};
path_parts.second, (!query_parameters.empty() ? "?" : ""),
absl::StrJoin(query_parameters, "&", absl::PairFormatter("=")))};
} }
std::string XdsClient::ConstructFullXdsResourceName( std::string XdsClient::ConstructFullXdsResourceName(
absl::string_view authority, absl::string_view resource_type, absl::string_view authority, absl::string_view resource_type,
absl::string_view id) { const XdsResourceKey& key) {
if (absl::ConsumePrefix(&authority, "xdstp:")) { if (absl::ConsumePrefix(&authority, "xdstp:")) {
return absl::StrCat("xdstp://", authority, "/", resource_type, "/", id); auto uri = URI::Create("xdstp", std::string(authority),
absl::StrCat("/", resource_type, "/", key.id),
key.query_params, /*fragment=*/"");
GPR_ASSERT(uri.ok());
return uri->ToString();
} }
return std::string(id); // Old-style name.
return key.id;
} }
RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats( RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
@ -2252,10 +2282,10 @@ std::string XdsClient::DumpClientConfigBinary() {
auto& resource_metadata_map = auto& resource_metadata_map =
resource_type_metadata_map[type->type_url()]; resource_type_metadata_map[type->type_url()];
for (const auto& r : t.second) { // resource id for (const auto& r : t.second) { // resource id
const std::string& resource_id = r.first; const XdsResourceKey& resource_key = r.first;
const ResourceState& resource_state = r.second; const ResourceState& resource_state = r.second;
resource_metadata_map[ConstructFullXdsResourceName( resource_metadata_map[ConstructFullXdsResourceName(
authority, type->type_url(), resource_id)] = &resource_state.meta; authority, type->type_url(), resource_key)] = &resource_state.meta;
} }
} }
} }

@ -37,6 +37,7 @@
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/iomgr/work_serializer.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core { namespace grpc_core {
@ -150,9 +151,20 @@ class XdsClient : public DualRefCounted<XdsClient> {
const grpc_channel_args& args); const grpc_channel_args& args);
private: private:
struct XdsResourceKey {
std::string id;
std::vector<URI::QueryParam> query_params;
bool operator<(const XdsResourceKey& other) const {
int c = id.compare(other.id);
if (c != 0) return c < 0;
return query_params < other.query_params;
}
};
struct XdsResourceName { struct XdsResourceName {
std::string authority; std::string authority;
std::string id; XdsResourceKey key;
}; };
// Contains a channel to the xds server and all the data related to the // Contains a channel to the xds server and all the data related to the
@ -225,8 +237,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
struct AuthorityState { struct AuthorityState {
RefCountedPtr<ChannelState> channel_state; RefCountedPtr<ChannelState> channel_state;
std::map<const XdsResourceType*, std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>>
std::map<std::string /*id*/, ResourceState>>
resource_map; resource_map;
}; };
@ -261,7 +272,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
absl::string_view name, const XdsResourceType* type); absl::string_view name, const XdsResourceType* type);
static std::string ConstructFullXdsResourceName( static std::string ConstructFullXdsResourceName(
absl::string_view authority, absl::string_view resource_type, absl::string_view authority, absl::string_view resource_type,
absl::string_view id); const XdsResourceKey& key);
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
bool send_all_clusters, const std::set<std::string>& clusters) bool send_all_clusters, const std::set<std::string>& clusters)

@ -416,6 +416,17 @@ XdsServerConfigFetcher::XdsServerConfigFetcher(
GPR_ASSERT(xds_client_ != nullptr); GPR_ASSERT(xds_client_ != nullptr);
} }
std::string ListenerResourceName(absl::string_view resource_name_template,
absl::string_view listening_address) {
std::string tmp;
if (absl::StartsWith(resource_name_template, "xdstp:")) {
tmp = URI::PercentEncodePath(listening_address);
listening_address = tmp;
}
return absl::StrReplaceAll(resource_name_template,
{{"%s", listening_address}});
}
void XdsServerConfigFetcher::StartWatch( void XdsServerConfigFetcher::StartWatch(
std::string listening_address, std::string listening_address,
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> watcher) { std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> watcher) {
@ -426,9 +437,9 @@ void XdsServerConfigFetcher::StartWatch(
auto* listener_watcher_ptr = listener_watcher.get(); auto* listener_watcher_ptr = listener_watcher.get();
XdsListenerResourceType::StartWatch( XdsListenerResourceType::StartWatch(
xds_client_.get(), xds_client_.get(),
absl::StrReplaceAll( ListenerResourceName(
xds_client_->bootstrap().server_listener_resource_name_template(), xds_client_->bootstrap().server_listener_resource_name_template(),
{{"%s", listening_address}}), listening_address),
std::move(listener_watcher)); std::move(listener_watcher));
MutexLock lock(&mu_); MutexLock lock(&mu_);
listener_watchers_.emplace(watcher_ptr, listener_watcher_ptr); listener_watchers_.emplace(watcher_ptr, listener_watcher_ptr);
@ -442,9 +453,9 @@ void XdsServerConfigFetcher::CancelWatch(
// Cancel the watch on the listener before erasing // Cancel the watch on the listener before erasing
XdsListenerResourceType::CancelWatch( XdsListenerResourceType::CancelWatch(
xds_client_.get(), xds_client_.get(),
absl::StrReplaceAll( ListenerResourceName(
xds_client_->bootstrap().server_listener_resource_name_template(), xds_client_->bootstrap().server_listener_resource_name_template(),
{{"%s", it->second->listening_address()}}), it->second->listening_address()),
it->second, false /* delay_unsubscription */); it->second, false /* delay_unsubscription */);
listener_watchers_.erase(it); listener_watchers_.erase(it);
} }

@ -739,10 +739,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
client_load_reporting_interval_seconds), client_load_reporting_interval_seconds),
xds_resource_does_not_exist_timeout_ms_( xds_resource_does_not_exist_timeout_ms_(
xds_resource_does_not_exist_timeout_ms), xds_resource_does_not_exist_timeout_ms),
use_xds_enabled_server_(use_xds_enabled_server) {} use_xds_enabled_server_(use_xds_enabled_server) {
void CreateClientsAndServers(BootstrapBuilder builder = BootstrapBuilder(),
std::string lb_expected_authority = "") {
bool localhost_resolves_to_ipv4 = false; bool localhost_resolves_to_ipv4 = false;
bool localhost_resolves_to_ipv6 = false; bool localhost_resolves_to_ipv6 = false;
grpc_core::LocalhostResolves(&localhost_resolves_to_ipv4, grpc_core::LocalhostResolves(&localhost_resolves_to_ipv4,
@ -792,6 +789,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
->add_filters() ->add_filters()
->mutable_typed_config() ->mutable_typed_config()
->PackFrom(http_connection_manager); ->PackFrom(http_connection_manager);
}
void CreateClientsAndServers(BootstrapBuilder builder = BootstrapBuilder(),
std::string lb_expected_authority = "") {
// Create the backends but don't start them yet. We need to create the // Create the backends but don't start them yet. We need to create the
// backends to allocate the ports, so that the xDS servers know what // backends to allocate the ports, so that the xDS servers know what
// default resources to populate when we create them. However, we can't // default resources to populate when we create them. However, we can't
@ -803,6 +804,17 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
} }
// Start the load balancer. // Start the load balancer.
balancer_ = CreateAndStartBalancer(); balancer_ = CreateAndStartBalancer();
// Initialize resources on balancer.
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
default_route_config_);
if (use_xds_enabled_server_) {
for (const auto& backend : backends_) {
SetServerListenerNameAndRouteConfiguration(
balancer_.get(), default_server_listener_, backend->port(),
default_server_route_config_);
}
}
balancer_->ads_service()->SetCdsResource(default_cluster_);
// Create fake resolver response generators used by client. // Create fake resolver response generators used by client.
logical_dns_cluster_resolver_response_generator_ = logical_dns_cluster_resolver_response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>(); grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
@ -1793,19 +1805,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
(GetParam().enable_load_reporting() (GetParam().enable_load_reporting()
? test_obj->client_load_reporting_interval_seconds_ ? test_obj->client_load_reporting_interval_seconds_
: 0), : 0),
{kDefaultClusterName})) { {kDefaultClusterName})) {}
// Initialize resources.
test_obj->SetListenerAndRouteConfiguration(
this, test_obj->default_listener_, test_obj->default_route_config_);
if (test_obj->use_xds_enabled_server_) {
for (const auto& backend : test_obj->backends_) {
test_obj->SetServerListenerNameAndRouteConfiguration(
this, test_obj->default_server_listener_, backend->port(),
test_obj->default_server_route_config_);
}
}
ads_service_->SetCdsResource(test_obj->default_cluster_);
}
AdsServiceImpl* ads_service() { return ads_service_.get(); } AdsServiceImpl* ads_service() { return ads_service_.get(); }
LrsServiceImpl* lrs_service() { return lrs_service_.get(); } LrsServiceImpl* lrs_service() { return lrs_service_.get(); }
@ -2628,9 +2628,10 @@ TEST_P(XdsFederationTest, FederationTargetNoAuthorityWithResourceTemplate) {
TEST_P(XdsFederationTest, FederationTargetAuthorityDefaultResourceTemplate) { TEST_P(XdsFederationTest, FederationTargetAuthorityDefaultResourceTemplate) {
gpr_setenv("GRPC_EXPERIMENTAL_XDS_FEDERATION", "true"); gpr_setenv("GRPC_EXPERIMENTAL_XDS_FEDERATION", "true");
const char* kAuthority = "xds.example.com"; const char* kAuthority = "xds.example.com";
const char* kNewServerName = "whee%/server.example.com";
const char* kNewListenerName = const char* kNewListenerName =
"xdstp://xds.example.com/envoy.config.listener.v3.Listener/" "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
"server.example.com"; "whee%25/server.example.com";
const char* kNewRouteConfigName = const char* kNewRouteConfigName =
"xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/" "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
"new_route_config_name"; "new_route_config_name";
@ -2674,7 +2675,7 @@ TEST_P(XdsFederationTest, FederationTargetAuthorityDefaultResourceTemplate) {
WaitForAllBackends(0, 1); WaitForAllBackends(0, 1);
// Create second channel to new target uri and send 1 RPC . // Create second channel to new target uri and send 1 RPC .
auto channel2 = auto channel2 =
CreateChannel(/*failover_timeout=*/0, kServerName, kAuthority); CreateChannel(/*failover_timeout=*/0, kNewServerName, kAuthority);
channel2->GetState(/*try_to_connect=*/true); channel2->GetState(/*try_to_connect=*/true);
ASSERT_TRUE( ASSERT_TRUE(
channel2->WaitForConnected(grpc_timeout_milliseconds_to_deadline(100))); channel2->WaitForConnected(grpc_timeout_milliseconds_to_deadline(100)));
@ -2698,12 +2699,13 @@ TEST_P(XdsFederationTest, FederationTargetAuthorityDefaultResourceTemplate) {
TEST_P(XdsFederationTest, FederationTargetAuthorityWithResourceTemplate) { TEST_P(XdsFederationTest, FederationTargetAuthorityWithResourceTemplate) {
gpr_setenv("GRPC_EXPERIMENTAL_XDS_FEDERATION", "true"); gpr_setenv("GRPC_EXPERIMENTAL_XDS_FEDERATION", "true");
const char* kAuthority = "xds.example.com"; const char* kAuthority = "xds.example.com";
const char* kNewServerName = "whee%/server.example.com";
const char* kNewListenerTemplate = const char* kNewListenerTemplate =
"xdstp://xds.example.com/envoy.config.listener.v3.Listener/" "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
"client/%s?psm_project_id=1234"; "client/%s?psm_project_id=1234";
const char* kNewListenerName = const char* kNewListenerName =
"xdstp://xds.example.com/envoy.config.listener.v3.Listener/" "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
"client/server.example.com?psm_project_id=1234"; "client/whee%25/server.example.com?psm_project_id=1234";
const char* kNewRouteConfigName = const char* kNewRouteConfigName =
"xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/" "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
"new_route_config_name"; "new_route_config_name";
@ -2748,7 +2750,7 @@ TEST_P(XdsFederationTest, FederationTargetAuthorityWithResourceTemplate) {
WaitForAllBackends(0, 1); WaitForAllBackends(0, 1);
// Create second channel to new target uri and send 1 RPC . // Create second channel to new target uri and send 1 RPC .
auto channel2 = auto channel2 =
CreateChannel(/*failover_timeout=*/0, kServerName, kAuthority); CreateChannel(/*failover_timeout=*/0, kNewServerName, kAuthority);
channel2->GetState(/*try_to_connect=*/true); channel2->GetState(/*try_to_connect=*/true);
ASSERT_TRUE( ASSERT_TRUE(
channel2->WaitForConnected(grpc_timeout_milliseconds_to_deadline(100))); channel2->WaitForConnected(grpc_timeout_milliseconds_to_deadline(100)));
@ -2829,7 +2831,8 @@ TEST_P(XdsFederationTest, FederationServer) {
Listener server_listener = default_server_listener_; Listener server_listener = default_server_listener_;
server_listener.set_name(absl::StrCat( server_listener.set_name(absl::StrCat(
"xdstp://xds.example.com/envoy.config.listener.v3.Listener/server/", "xdstp://xds.example.com/envoy.config.listener.v3.Listener/server/",
ipv6_only_ ? "[::1]:" : "127.0.0.1:", port, "?psm_project_id=1234")); ipv6_only_ ? "%5B::1%5D:" : "127.0.0.1:", port,
"?psm_project_id=1234"));
server_listener.mutable_address()->mutable_socket_address()->set_port_value( server_listener.mutable_address()->mutable_socket_address()->set_port_value(
port); port);
authority_balancer_->ads_service()->SetLdsResource(server_listener); authority_balancer_->ads_service()->SetLdsResource(server_listener);

Loading…
Cancel
Save