|
|
|
@ -89,15 +89,14 @@ class XdsClient::Notifier { |
|
|
|
|
// watchers_list within \a work_serializer. Works with all 4 resource types.
|
|
|
|
|
template <class T> |
|
|
|
|
static void ScheduleNotifyWatchersOnErrorInWorkSerializer( |
|
|
|
|
XdsClient* xds_client, const T& watchers_list, grpc_error_handle error, |
|
|
|
|
XdsClient* xds_client, const T& watchers_list, absl::Status status, |
|
|
|
|
const DebugLocation& location) { |
|
|
|
|
xds_client->work_serializer_.Schedule( |
|
|
|
|
[watchers_list, error]() |
|
|
|
|
[watchers_list, status]() |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client->work_serializer_) { |
|
|
|
|
for (const auto& p : watchers_list) { |
|
|
|
|
p.first->OnError(GRPC_ERROR_REF(error)); |
|
|
|
|
p.first->OnError(status); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
}, |
|
|
|
|
location); |
|
|
|
|
} |
|
|
|
@ -269,19 +268,16 @@ class XdsClient::ChannelState::AdsCallState |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { |
|
|
|
|
if (error == GRPC_ERROR_NONE && timer_pending_) { |
|
|
|
|
timer_pending_ = false; |
|
|
|
|
grpc_error_handle watcher_error = |
|
|
|
|
GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat( |
|
|
|
|
"timeout obtaining resource {type=%s name=%s} from xds server", |
|
|
|
|
type_->type_url(), |
|
|
|
|
XdsClient::ConstructFullXdsResourceName( |
|
|
|
|
name_.authority, type_->type_url(), name_.key))); |
|
|
|
|
watcher_error = grpc_error_set_int( |
|
|
|
|
watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); |
|
|
|
|
absl::Status watcher_error = absl::UnavailableError(absl::StrFormat( |
|
|
|
|
"timeout obtaining resource {type=%s name=%s} from xds server", |
|
|
|
|
type_->type_url(), |
|
|
|
|
XdsClient::ConstructFullXdsResourceName( |
|
|
|
|
name_.authority, type_->type_url(), name_.key))); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] xds server %s: %s", |
|
|
|
|
ads_calld_->xds_client(), |
|
|
|
|
ads_calld_->chand()->server_.server_uri.c_str(), |
|
|
|
|
grpc_error_std_string(watcher_error).c_str()); |
|
|
|
|
watcher_error.ToString().c_str()); |
|
|
|
|
} |
|
|
|
|
auto& authority_state = |
|
|
|
|
ads_calld_->xds_client()->authority_state_map_[name_.authority]; |
|
|
|
@ -496,7 +492,7 @@ class XdsClient::ChannelState::StateWatcher |
|
|
|
|
parent_->xds_client(), parent_->server_.server_uri.c_str(), |
|
|
|
|
status.ToString().c_str()); |
|
|
|
|
parent_->xds_client_->NotifyOnErrorLocked( |
|
|
|
|
GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat( |
|
|
|
|
absl::UnavailableError(absl::StrCat( |
|
|
|
|
"xds channel in TRANSIENT_FAILURE, connectivity error: ", |
|
|
|
|
status.ToString()))); |
|
|
|
|
} |
|
|
|
@ -603,7 +599,7 @@ bool IsLameChannel(grpc_channel* channel) { |
|
|
|
|
void XdsClient::ChannelState::StartConnectivityWatchLocked() { |
|
|
|
|
if (IsLameChannel(channel_)) { |
|
|
|
|
xds_client()->NotifyOnErrorLocked( |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds client has a lame channel")); |
|
|
|
|
absl::UnavailableError("xds client has a lame channel")); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_); |
|
|
|
@ -883,10 +879,8 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( |
|
|
|
|
": validation error: ", result->resource.status().ToString())); |
|
|
|
|
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( |
|
|
|
|
xds_client(), resource_state.watchers, |
|
|
|
|
grpc_error_set_int( |
|
|
|
|
GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat( |
|
|
|
|
"invalid resource: ", result->resource.status().ToString())), |
|
|
|
|
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), |
|
|
|
|
absl::UnavailableError(absl::StrCat( |
|
|
|
|
"invalid resource: ", result->resource.status().ToString())), |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
UpdateResourceMetadataNacked(result_.version, |
|
|
|
|
result->resource.status().ToString(), |
|
|
|
@ -1321,13 +1315,11 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked( |
|
|
|
|
// Try to restart the call.
|
|
|
|
|
parent_->OnCallFinishedLocked(); |
|
|
|
|
// Send error to all watchers.
|
|
|
|
|
xds_client()->NotifyOnErrorLocked( |
|
|
|
|
GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat( |
|
|
|
|
"xDS call failed: xDS server: %s, ADS call status code=%d, " |
|
|
|
|
"details='%s', error='%s'", |
|
|
|
|
chand()->server_.server_uri, status_code_, |
|
|
|
|
StringViewFromSlice(status_details_), |
|
|
|
|
grpc_error_std_string(error)))); |
|
|
|
|
xds_client()->NotifyOnErrorLocked(absl::UnavailableError(absl::StrFormat( |
|
|
|
|
"xDS call failed: xDS server: %s, ADS call status code=%d, " |
|
|
|
|
"details='%s', error='%s'", |
|
|
|
|
chand()->server_.server_uri, status_code_, |
|
|
|
|
StringViewFromSlice(status_details_), grpc_error_std_string(error)))); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
@ -1899,7 +1891,7 @@ void XdsClient::WatchResource(const XdsResourceType* type, |
|
|
|
|
RefCountedPtr<ResourceWatcherInterface> watcher) { |
|
|
|
|
ResourceWatcherInterface* w = watcher.get(); |
|
|
|
|
// Lambda for handling failure cases.
|
|
|
|
|
auto fail = [&](grpc_error_handle error) mutable { |
|
|
|
|
auto fail = [&](absl::Status status) mutable { |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
MaybeRegisterResourceTypeLocked(type); |
|
|
|
@ -1908,14 +1900,14 @@ void XdsClient::WatchResource(const XdsResourceType* type, |
|
|
|
|
work_serializer_.Run( |
|
|
|
|
// TODO(yashykt): When we move to C++14, capture watcher using
|
|
|
|
|
// std::move()
|
|
|
|
|
[watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { |
|
|
|
|
watcher->OnError(error); |
|
|
|
|
[watcher, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { |
|
|
|
|
watcher->OnError(status); |
|
|
|
|
}, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
}; |
|
|
|
|
auto resource_name = ParseXdsResourceName(name, type); |
|
|
|
|
if (!resource_name.ok()) { |
|
|
|
|
fail(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat( |
|
|
|
|
fail(absl::UnavailableError(absl::StrFormat( |
|
|
|
|
"Unable to parse resource name for listener %s", name))); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -1925,7 +1917,7 @@ void XdsClient::WatchResource(const XdsResourceType* type, |
|
|
|
|
if (absl::ConsumePrefix(&authority_name, "xdstp:")) { |
|
|
|
|
auto* authority = bootstrap_->LookupAuthority(std::string(authority_name)); |
|
|
|
|
if (authority == nullptr) { |
|
|
|
|
fail(GRPC_ERROR_CREATE_FROM_CPP_STRING( |
|
|
|
|
fail(absl::UnavailableError( |
|
|
|
|
absl::StrCat("authority \"", authority_name, |
|
|
|
|
"\" not present in bootstrap config"))); |
|
|
|
|
return; |
|
|
|
@ -2206,7 +2198,7 @@ void XdsClient::ResetBackoff() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) { |
|
|
|
|
void XdsClient::NotifyOnErrorLocked(absl::Status status) { |
|
|
|
|
std::set<RefCountedPtr<ResourceWatcherInterface>> watchers; |
|
|
|
|
for (const auto& a : authority_state_map_) { // authority
|
|
|
|
|
for (const auto& t : a.second.resource_map) { // type
|
|
|
|
@ -2220,11 +2212,10 @@ void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) { |
|
|
|
|
work_serializer_.Schedule( |
|
|
|
|
// TODO(yashykt): When we move to C++14, capture watchers using
|
|
|
|
|
// std::move()
|
|
|
|
|
[watchers, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { |
|
|
|
|
[watchers, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { |
|
|
|
|
for (const auto& watcher : watchers) { |
|
|
|
|
watcher->OnError(GRPC_ERROR_REF(error)); |
|
|
|
|
watcher->OnError(status); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
}, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|