|
|
@ -233,7 +233,7 @@ class XdsClient::ChannelState::AdsCallState |
|
|
|
"[xds_client %p] xds server %s: timeout obtaining resource " |
|
|
|
"[xds_client %p] xds server %s: timeout obtaining resource " |
|
|
|
"{type=%s name=%s} from xds server", |
|
|
|
"{type=%s name=%s} from xds server", |
|
|
|
ads_calld_->xds_client(), |
|
|
|
ads_calld_->xds_client(), |
|
|
|
ads_calld_->chand()->server_.server_uri().c_str(), |
|
|
|
ads_calld_->chand()->server_.server_uri.c_str(), |
|
|
|
std::string(type_->type_url()).c_str(), |
|
|
|
std::string(type_->type_url()).c_str(), |
|
|
|
XdsClient::ConstructFullXdsResourceName( |
|
|
|
XdsClient::ConstructFullXdsResourceName( |
|
|
|
name_.authority, type_->type_url(), name_.key) |
|
|
|
name_.authority, type_->type_url(), name_.key) |
|
|
@ -426,7 +426,7 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client, |
|
|
|
server_(server) { |
|
|
|
server_(server) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", |
|
|
|
xds_client_.get(), server.server_uri().c_str()); |
|
|
|
xds_client_.get(), server.server_uri.c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
absl::Status status; |
|
|
|
absl::Status status; |
|
|
|
transport_ = xds_client_->transport_factory_->Create( |
|
|
|
transport_ = xds_client_->transport_factory_->Create( |
|
|
@ -443,7 +443,7 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client, |
|
|
|
XdsClient::ChannelState::~ChannelState() { |
|
|
|
XdsClient::ChannelState::~ChannelState() { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s", |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s", |
|
|
|
xds_client(), this, server_.server_uri().c_str()); |
|
|
|
xds_client(), this, server_.server_uri.c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
xds_client_.reset(DEBUG_LOCATION, "ChannelState"); |
|
|
|
xds_client_.reset(DEBUG_LOCATION, "ChannelState"); |
|
|
|
} |
|
|
|
} |
|
|
@ -458,7 +458,7 @@ void XdsClient::ChannelState::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS { |
|
|
|
// At this time, all strong refs are removed, remove from channel map to
|
|
|
|
// At this time, all strong refs are removed, remove from channel map to
|
|
|
|
// prevent subsequent subscription from trying to use this ChannelState as
|
|
|
|
// prevent subsequent subscription from trying to use this ChannelState as
|
|
|
|
// it is shutting down.
|
|
|
|
// it is shutting down.
|
|
|
|
xds_client_->xds_server_channel_map_.erase(&server_); |
|
|
|
xds_client_->xds_server_channel_map_.erase(server_); |
|
|
|
ads_calld_.reset(); |
|
|
|
ads_calld_.reset(); |
|
|
|
lrs_calld_.reset(); |
|
|
|
lrs_calld_.reset(); |
|
|
|
} |
|
|
|
} |
|
|
@ -486,7 +486,7 @@ void XdsClient::ChannelState::MaybeStartLrsCall() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::StopLrsCallLocked() { |
|
|
|
void XdsClient::ChannelState::StopLrsCallLocked() { |
|
|
|
xds_client_->xds_load_report_server_map_.erase(&server_); |
|
|
|
xds_client_->xds_load_report_server_map_.erase(server_); |
|
|
|
lrs_calld_.reset(); |
|
|
|
lrs_calld_.reset(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -537,7 +537,7 @@ void XdsClient::ChannelState::OnConnectivityStateChangeLocked( |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[xds_client %p] xds channel for server %s in " |
|
|
|
"[xds_client %p] xds channel for server %s in " |
|
|
|
"state TRANSIENT_FAILURE: %s", |
|
|
|
"state TRANSIENT_FAILURE: %s", |
|
|
|
xds_client(), server_.server_uri().c_str(), |
|
|
|
xds_client(), server_.server_uri.c_str(), |
|
|
|
status.ToString().c_str()); |
|
|
|
status.ToString().c_str()); |
|
|
|
xds_client_->NotifyOnErrorLocked(absl::UnavailableError( |
|
|
|
xds_client_->NotifyOnErrorLocked(absl::UnavailableError( |
|
|
|
absl::StrCat("xds channel in TRANSIENT_FAILURE, connectivity error: ", |
|
|
|
absl::StrCat("xds channel in TRANSIENT_FAILURE, connectivity error: ", |
|
|
@ -592,7 +592,7 @@ void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() { |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[xds_client %p] xds server %s: start new call from retryable " |
|
|
|
"[xds_client %p] xds server %s: start new call from retryable " |
|
|
|
"call %p", |
|
|
|
"call %p", |
|
|
|
chand()->xds_client(), chand()->server_.server_uri().c_str(), this); |
|
|
|
chand()->xds_client(), chand()->server_.server_uri.c_str(), this); |
|
|
|
} |
|
|
|
} |
|
|
|
calld_ = MakeOrphanable<T>( |
|
|
|
calld_ = MakeOrphanable<T>( |
|
|
|
this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call")); |
|
|
|
this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call")); |
|
|
@ -608,7 +608,7 @@ void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() { |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[xds_client %p] xds server %s: call attempt failed; " |
|
|
|
"[xds_client %p] xds server %s: call attempt failed; " |
|
|
|
"retry timer will fire in %" PRId64 "ms.", |
|
|
|
"retry timer will fire in %" PRId64 "ms.", |
|
|
|
chand()->xds_client(), chand()->server_.server_uri().c_str(), |
|
|
|
chand()->xds_client(), chand()->server_.server_uri.c_str(), |
|
|
|
timeout.millis()); |
|
|
|
timeout.millis()); |
|
|
|
} |
|
|
|
} |
|
|
|
timer_handle_ = GetDefaultEventEngine()->RunAfter( |
|
|
|
timer_handle_ = GetDefaultEventEngine()->RunAfter( |
|
|
@ -630,8 +630,7 @@ void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer() { |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[xds_client %p] xds server %s: retry timer fired (retryable " |
|
|
|
"[xds_client %p] xds server %s: retry timer fired (retryable " |
|
|
|
"call: %p)", |
|
|
|
"call: %p)", |
|
|
|
chand()->xds_client(), chand()->server_.server_uri().c_str(), |
|
|
|
chand()->xds_client(), chand()->server_.server_uri.c_str(), this); |
|
|
|
this); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
StartNewCallLocked(); |
|
|
|
StartNewCallLocked(); |
|
|
|
} |
|
|
|
} |
|
|
@ -649,7 +648,7 @@ absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser:: |
|
|
|
"[xds_client %p] xds server %s: received ADS response: type_url=%s, " |
|
|
|
"[xds_client %p] xds server %s: received ADS response: type_url=%s, " |
|
|
|
"version=%s, nonce=%s, num_resources=%" PRIuPTR, |
|
|
|
"version=%s, nonce=%s, num_resources=%" PRIuPTR, |
|
|
|
ads_call_state_->xds_client(), |
|
|
|
ads_call_state_->xds_client(), |
|
|
|
ads_call_state_->chand()->server_.server_uri().c_str(), |
|
|
|
ads_call_state_->chand()->server_.server_uri.c_str(), |
|
|
|
fields.type_url.c_str(), fields.version.c_str(), fields.nonce.c_str(), |
|
|
|
fields.type_url.c_str(), fields.version.c_str(), fields.nonce.c_str(), |
|
|
|
fields.num_resources); |
|
|
|
fields.num_resources); |
|
|
|
} |
|
|
|
} |
|
|
@ -772,8 +771,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( |
|
|
|
"[xds_client %p] xds server %s: server returned new version of " |
|
|
|
"[xds_client %p] xds server %s: server returned new version of " |
|
|
|
"resource for which we previously ignored a deletion: type %s " |
|
|
|
"resource for which we previously ignored a deletion: type %s " |
|
|
|
"name %s", |
|
|
|
"name %s", |
|
|
|
xds_client(), |
|
|
|
xds_client(), ads_call_state_->chand()->server_.server_uri.c_str(), |
|
|
|
ads_call_state_->chand()->server_.server_uri().c_str(), |
|
|
|
|
|
|
|
std::string(type_url).c_str(), std::string(resource_name).c_str()); |
|
|
|
std::string(type_url).c_str(), std::string(resource_name).c_str()); |
|
|
|
resource_state.ignored_deletion = false; |
|
|
|
resource_state.ignored_deletion = false; |
|
|
|
} |
|
|
|
} |
|
|
@ -860,7 +858,7 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[xds_client %p] xds server %s: starting ADS call " |
|
|
|
"[xds_client %p] xds server %s: starting ADS call " |
|
|
|
"(calld: %p, call: %p)", |
|
|
|
"(calld: %p, call: %p)", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str(), this, |
|
|
|
xds_client(), chand()->server_.server_uri.c_str(), this, |
|
|
|
call_.get()); |
|
|
|
call_.get()); |
|
|
|
} |
|
|
|
} |
|
|
|
// If this is a reconnect, add any necessary subscriptions from what's
|
|
|
|
// If this is a reconnect, add any necessary subscriptions from what's
|
|
|
@ -910,7 +908,7 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[xds_client %p] xds server %s: sending ADS request: type=%s " |
|
|
|
"[xds_client %p] xds server %s: sending ADS request: type=%s " |
|
|
|
"version=%s nonce=%s error=%s", |
|
|
|
"version=%s nonce=%s error=%s", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str(), |
|
|
|
xds_client(), chand()->server_.server_uri.c_str(), |
|
|
|
std::string(type->type_url()).c_str(), |
|
|
|
std::string(type->type_url()).c_str(), |
|
|
|
chand()->resource_type_version_map_[type].c_str(), |
|
|
|
chand()->resource_type_version_map_[type].c_str(), |
|
|
|
state.nonce.c_str(), state.status.ToString().c_str()); |
|
|
|
state.nonce.c_str(), state.status.ToString().c_str()); |
|
|
@ -983,7 +981,7 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage( |
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
"[xds_client %p] xds server %s: error parsing ADS response (%s) " |
|
|
|
"[xds_client %p] xds server %s: error parsing ADS response (%s) " |
|
|
|
"-- ignoring", |
|
|
|
"-- ignoring", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str(), |
|
|
|
xds_client(), chand()->server_.server_uri.c_str(), |
|
|
|
status.ToString().c_str()); |
|
|
|
status.ToString().c_str()); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
seen_response_ = true; |
|
|
|
seen_response_ = true; |
|
|
@ -1000,7 +998,7 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage( |
|
|
|
"[xds_client %p] xds server %s: ADS response invalid for " |
|
|
|
"[xds_client %p] xds server %s: ADS response invalid for " |
|
|
|
"resource " |
|
|
|
"resource " |
|
|
|
"type %s version %s, will NACK: nonce=%s status=%s", |
|
|
|
"type %s version %s, will NACK: nonce=%s status=%s", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str(), |
|
|
|
xds_client(), chand()->server_.server_uri.c_str(), |
|
|
|
result.type_url.c_str(), result.version.c_str(), |
|
|
|
result.type_url.c_str(), result.version.c_str(), |
|
|
|
state.nonce.c_str(), state.status.ToString().c_str()); |
|
|
|
state.nonce.c_str(), state.status.ToString().c_str()); |
|
|
|
} |
|
|
|
} |
|
|
@ -1035,7 +1033,7 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage( |
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
"[xds_client %p] xds server %s: ignoring deletion " |
|
|
|
"[xds_client %p] xds server %s: ignoring deletion " |
|
|
|
"for resource type %s name %s", |
|
|
|
"for resource type %s name %s", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str(), |
|
|
|
xds_client(), chand()->server_.server_uri.c_str(), |
|
|
|
result.type_url.c_str(), |
|
|
|
result.type_url.c_str(), |
|
|
|
XdsClient::ConstructFullXdsResourceName( |
|
|
|
XdsClient::ConstructFullXdsResourceName( |
|
|
|
authority, result.type_url.c_str(), resource_key) |
|
|
|
authority, result.type_url.c_str(), resource_key) |
|
|
@ -1077,8 +1075,8 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived( |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[xds_client %p] xds server %s: ADS call status received " |
|
|
|
"[xds_client %p] xds server %s: ADS call status received " |
|
|
|
"(chand=%p, ads_calld=%p, call=%p): %s", |
|
|
|
"(chand=%p, ads_calld=%p, call=%p): %s", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str(), chand(), |
|
|
|
xds_client(), chand()->server_.server_uri.c_str(), chand(), this, |
|
|
|
this, call_.get(), status.ToString().c_str()); |
|
|
|
call_.get(), status.ToString().c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
// Ignore status from a stale call.
|
|
|
|
// Ignore status from a stale call.
|
|
|
|
if (IsCurrentCallOnChannel()) { |
|
|
|
if (IsCurrentCallOnChannel()) { |
|
|
@ -1087,7 +1085,7 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived( |
|
|
|
// Send error to all watchers.
|
|
|
|
// Send error to all watchers.
|
|
|
|
xds_client()->NotifyOnErrorLocked(absl::UnavailableError(absl::StrFormat( |
|
|
|
xds_client()->NotifyOnErrorLocked(absl::UnavailableError(absl::StrFormat( |
|
|
|
"xDS call failed: xDS server: %s, ADS call status: %s", |
|
|
|
"xDS call failed: xDS server: %s, ADS call status: %s", |
|
|
|
chand()->server_.server_uri(), status.ToString().c_str()))); |
|
|
|
chand()->server_.server_uri, status.ToString().c_str()))); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
xds_client()->work_serializer_.DrainQueue(); |
|
|
|
xds_client()->work_serializer_.DrainQueue(); |
|
|
@ -1179,7 +1177,7 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { |
|
|
|
last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot); |
|
|
|
last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot); |
|
|
|
if (old_val && last_report_counters_were_zero_) { |
|
|
|
if (old_val && last_report_counters_were_zero_) { |
|
|
|
auto it = xds_client()->xds_load_report_server_map_.find( |
|
|
|
auto it = xds_client()->xds_load_report_server_map_.find( |
|
|
|
&parent_->chand()->server_); |
|
|
|
parent_->chand()->server_); |
|
|
|
if (it == xds_client()->xds_load_report_server_map_.end() || |
|
|
|
if (it == xds_client()->xds_load_report_server_map_.end() || |
|
|
|
it->second.load_report_map.empty()) { |
|
|
|
it->second.load_report_map.empty()) { |
|
|
|
it->second.channel_state->StopLrsCallLocked(); |
|
|
|
it->second.channel_state->StopLrsCallLocked(); |
|
|
@ -1205,8 +1203,8 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked() { |
|
|
|
// we just ignore the completion and wait for the timer to fire.
|
|
|
|
// we just ignore the completion and wait for the timer to fire.
|
|
|
|
if (timer_handle_.has_value()) return; |
|
|
|
if (timer_handle_.has_value()) return; |
|
|
|
// If there are no more registered stats to report, cancel the call.
|
|
|
|
// If there are no more registered stats to report, cancel the call.
|
|
|
|
auto it = xds_client()->xds_load_report_server_map_.find( |
|
|
|
auto it = |
|
|
|
&parent_->chand()->server_); |
|
|
|
xds_client()->xds_load_report_server_map_.find(parent_->chand()->server_); |
|
|
|
if (it == xds_client()->xds_load_report_server_map_.end()) return; |
|
|
|
if (it == xds_client()->xds_load_report_server_map_.end()) return; |
|
|
|
if (it->second.load_report_map.empty()) { |
|
|
|
if (it->second.load_report_map.empty()) { |
|
|
|
if (it->second.channel_state != nullptr) { |
|
|
|
if (it->second.channel_state != nullptr) { |
|
|
@ -1249,7 +1247,7 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[xds_client %p] xds server %s: starting LRS call (calld=%p, " |
|
|
|
"[xds_client %p] xds server %s: starting LRS call (calld=%p, " |
|
|
|
"call=%p)", |
|
|
|
"call=%p)", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str(), this, |
|
|
|
xds_client(), chand()->server_.server_uri.c_str(), this, |
|
|
|
call_.get()); |
|
|
|
call_.get()); |
|
|
|
} |
|
|
|
} |
|
|
|
// Send the initial request.
|
|
|
|
// Send the initial request.
|
|
|
@ -1313,7 +1311,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage( |
|
|
|
if (!status.ok()) { |
|
|
|
if (!status.ok()) { |
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
"[xds_client %p] xds server %s: LRS response parsing failed: %s", |
|
|
|
"[xds_client %p] xds server %s: LRS response parsing failed: %s", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str(), |
|
|
|
xds_client(), chand()->server_.server_uri.c_str(), |
|
|
|
status.ToString().c_str()); |
|
|
|
status.ToString().c_str()); |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
@ -1324,7 +1322,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage( |
|
|
|
"[xds_client %p] xds server %s: LRS response received, %" PRIuPTR |
|
|
|
"[xds_client %p] xds server %s: LRS response received, %" PRIuPTR |
|
|
|
" cluster names, send_all_clusters=%d, load_report_interval=%" PRId64 |
|
|
|
" cluster names, send_all_clusters=%d, load_report_interval=%" PRId64 |
|
|
|
"ms", |
|
|
|
"ms", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str(), |
|
|
|
xds_client(), chand()->server_.server_uri.c_str(), |
|
|
|
new_cluster_names.size(), send_all_clusters, |
|
|
|
new_cluster_names.size(), send_all_clusters, |
|
|
|
new_load_reporting_interval.millis()); |
|
|
|
new_load_reporting_interval.millis()); |
|
|
|
size_t i = 0; |
|
|
|
size_t i = 0; |
|
|
@ -1341,7 +1339,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage( |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[xds_client %p] xds server %s: increased load_report_interval " |
|
|
|
"[xds_client %p] xds server %s: increased load_report_interval " |
|
|
|
"to minimum value %dms", |
|
|
|
"to minimum value %dms", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str(), |
|
|
|
xds_client(), chand()->server_.server_uri.c_str(), |
|
|
|
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); |
|
|
|
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -1353,7 +1351,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage( |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[xds_client %p] xds server %s: incoming LRS response identical " |
|
|
|
"[xds_client %p] xds server %s: incoming LRS response identical " |
|
|
|
"to current, ignoring.", |
|
|
|
"to current, ignoring.", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str()); |
|
|
|
xds_client(), chand()->server_.server_uri.c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
@ -1374,7 +1372,7 @@ void XdsClient::ChannelState::LrsCallState::OnStatusReceived( |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[xds_client %p] xds server %s: LRS call status received " |
|
|
|
"[xds_client %p] xds server %s: LRS call status received " |
|
|
|
"(chand=%p, calld=%p, call=%p): %s", |
|
|
|
"(chand=%p, calld=%p, call=%p): %s", |
|
|
|
xds_client(), chand()->server_.server_uri().c_str(), chand(), this, |
|
|
|
xds_client(), chand()->server_.server_uri.c_str(), chand(), this, |
|
|
|
call_.get(), status.ToString().c_str()); |
|
|
|
call_.get(), status.ToString().c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
// Ignore status from a stale call.
|
|
|
|
// Ignore status from a stale call.
|
|
|
@ -1409,7 +1407,6 @@ XdsClient::XdsClient(std::unique_ptr<XdsBootstrap> bootstrap, |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); |
|
|
|
} |
|
|
|
} |
|
|
|
GPR_ASSERT(bootstrap_ != nullptr); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
XdsClient::~XdsClient() { |
|
|
|
XdsClient::~XdsClient() { |
|
|
@ -1438,14 +1435,14 @@ void XdsClient::Orphan() { |
|
|
|
|
|
|
|
|
|
|
|
RefCountedPtr<XdsClient::ChannelState> XdsClient::GetOrCreateChannelStateLocked( |
|
|
|
RefCountedPtr<XdsClient::ChannelState> XdsClient::GetOrCreateChannelStateLocked( |
|
|
|
const XdsBootstrap::XdsServer& server, const char* reason) { |
|
|
|
const XdsBootstrap::XdsServer& server, const char* reason) { |
|
|
|
auto it = xds_server_channel_map_.find(&server); |
|
|
|
auto it = xds_server_channel_map_.find(server); |
|
|
|
if (it != xds_server_channel_map_.end()) { |
|
|
|
if (it != xds_server_channel_map_.end()) { |
|
|
|
return it->second->Ref(DEBUG_LOCATION, reason); |
|
|
|
return it->second->Ref(DEBUG_LOCATION, reason); |
|
|
|
} |
|
|
|
} |
|
|
|
// Channel not found, so create a new one.
|
|
|
|
// Channel not found, so create a new one.
|
|
|
|
auto channel_state = MakeRefCounted<ChannelState>( |
|
|
|
auto channel_state = MakeRefCounted<ChannelState>( |
|
|
|
WeakRef(DEBUG_LOCATION, "ChannelState"), server); |
|
|
|
WeakRef(DEBUG_LOCATION, "ChannelState"), server); |
|
|
|
xds_server_channel_map_[&server] = channel_state.get(); |
|
|
|
xds_server_channel_map_[server] = channel_state.get(); |
|
|
|
return channel_state; |
|
|
|
return channel_state; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -1485,7 +1482,9 @@ void XdsClient::WatchResource(const XdsResourceType* type, |
|
|
|
"\" not present in bootstrap config"))); |
|
|
|
"\" not present in bootstrap config"))); |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
xds_server = authority->server(); |
|
|
|
if (!authority->xds_servers.empty()) { |
|
|
|
|
|
|
|
xds_server = &authority->xds_servers[0]; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (xds_server == nullptr) xds_server = &bootstrap_->server(); |
|
|
|
if (xds_server == nullptr) xds_server = &bootstrap_->server(); |
|
|
|
{ |
|
|
|
{ |
|
|
@ -1635,8 +1634,7 @@ std::string XdsClient::ConstructFullXdsResourceName( |
|
|
|
RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats( |
|
|
|
RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats( |
|
|
|
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, |
|
|
|
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, |
|
|
|
absl::string_view eds_service_name) { |
|
|
|
absl::string_view eds_service_name) { |
|
|
|
const auto* server = bootstrap_->FindXdsServer(xds_server); |
|
|
|
if (!bootstrap_->XdsServerExists(xds_server)) return nullptr; |
|
|
|
if (server == nullptr) return nullptr; |
|
|
|
|
|
|
|
auto key = |
|
|
|
auto key = |
|
|
|
std::make_pair(std::string(cluster_name), std::string(eds_service_name)); |
|
|
|
std::make_pair(std::string(cluster_name), std::string(eds_service_name)); |
|
|
|
RefCountedPtr<XdsClusterDropStats> cluster_drop_stats; |
|
|
|
RefCountedPtr<XdsClusterDropStats> cluster_drop_stats; |
|
|
@ -1648,10 +1646,11 @@ RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats( |
|
|
|
// XdsBootstrap::XdsServer and strings
|
|
|
|
// XdsBootstrap::XdsServer and strings
|
|
|
|
// in the load_report_map_ key, so that they have the same lifetime.
|
|
|
|
// in the load_report_map_ key, so that they have the same lifetime.
|
|
|
|
auto server_it = |
|
|
|
auto server_it = |
|
|
|
xds_load_report_server_map_.emplace(server, LoadReportServer()).first; |
|
|
|
xds_load_report_server_map_.emplace(xds_server, LoadReportServer()) |
|
|
|
|
|
|
|
.first; |
|
|
|
if (server_it->second.channel_state == nullptr) { |
|
|
|
if (server_it->second.channel_state == nullptr) { |
|
|
|
server_it->second.channel_state = GetOrCreateChannelStateLocked( |
|
|
|
server_it->second.channel_state = GetOrCreateChannelStateLocked( |
|
|
|
*server, "load report map (drop stats)"); |
|
|
|
xds_server, "load report map (drop stats)"); |
|
|
|
} |
|
|
|
} |
|
|
|
auto load_report_it = server_it->second.load_report_map |
|
|
|
auto load_report_it = server_it->second.load_report_map |
|
|
|
.emplace(std::move(key), LoadReportState()) |
|
|
|
.emplace(std::move(key), LoadReportState()) |
|
|
@ -1666,7 +1665,7 @@ RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats( |
|
|
|
load_report_state.drop_stats->GetSnapshotAndReset(); |
|
|
|
load_report_state.drop_stats->GetSnapshotAndReset(); |
|
|
|
} |
|
|
|
} |
|
|
|
cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>( |
|
|
|
cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>( |
|
|
|
Ref(DEBUG_LOCATION, "DropStats"), *server, |
|
|
|
Ref(DEBUG_LOCATION, "DropStats"), server_it->first, |
|
|
|
load_report_it->first.first /*cluster_name*/, |
|
|
|
load_report_it->first.first /*cluster_name*/, |
|
|
|
load_report_it->first.second /*eds_service_name*/); |
|
|
|
load_report_it->first.second /*eds_service_name*/); |
|
|
|
load_report_state.drop_stats = cluster_drop_stats.get(); |
|
|
|
load_report_state.drop_stats = cluster_drop_stats.get(); |
|
|
@ -1681,10 +1680,8 @@ void XdsClient::RemoveClusterDropStats( |
|
|
|
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, |
|
|
|
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, |
|
|
|
absl::string_view eds_service_name, |
|
|
|
absl::string_view eds_service_name, |
|
|
|
XdsClusterDropStats* cluster_drop_stats) { |
|
|
|
XdsClusterDropStats* cluster_drop_stats) { |
|
|
|
const auto* server = bootstrap_->FindXdsServer(xds_server); |
|
|
|
|
|
|
|
if (server == nullptr) return; |
|
|
|
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
MutexLock lock(&mu_); |
|
|
|
auto server_it = xds_load_report_server_map_.find(server); |
|
|
|
auto server_it = xds_load_report_server_map_.find(xds_server); |
|
|
|
if (server_it == xds_load_report_server_map_.end()) return; |
|
|
|
if (server_it == xds_load_report_server_map_.end()) return; |
|
|
|
auto load_report_it = server_it->second.load_report_map.find( |
|
|
|
auto load_report_it = server_it->second.load_report_map.find( |
|
|
|
std::make_pair(std::string(cluster_name), std::string(eds_service_name))); |
|
|
|
std::make_pair(std::string(cluster_name), std::string(eds_service_name))); |
|
|
@ -1703,8 +1700,7 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats( |
|
|
|
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, |
|
|
|
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, |
|
|
|
absl::string_view eds_service_name, |
|
|
|
absl::string_view eds_service_name, |
|
|
|
RefCountedPtr<XdsLocalityName> locality) { |
|
|
|
RefCountedPtr<XdsLocalityName> locality) { |
|
|
|
const auto* server = bootstrap_->FindXdsServer(xds_server); |
|
|
|
if (!bootstrap_->XdsServerExists(xds_server)) return nullptr; |
|
|
|
if (server == nullptr) return nullptr; |
|
|
|
|
|
|
|
auto key = |
|
|
|
auto key = |
|
|
|
std::make_pair(std::string(cluster_name), std::string(eds_service_name)); |
|
|
|
std::make_pair(std::string(cluster_name), std::string(eds_service_name)); |
|
|
|
RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats; |
|
|
|
RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats; |
|
|
@ -1716,10 +1712,11 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats( |
|
|
|
// XdsBootstrap::XdsServer and strings
|
|
|
|
// XdsBootstrap::XdsServer and strings
|
|
|
|
// in the load_report_map_ key, so that they have the same lifetime.
|
|
|
|
// in the load_report_map_ key, so that they have the same lifetime.
|
|
|
|
auto server_it = |
|
|
|
auto server_it = |
|
|
|
xds_load_report_server_map_.emplace(server, LoadReportServer()).first; |
|
|
|
xds_load_report_server_map_.emplace(xds_server, LoadReportServer()) |
|
|
|
|
|
|
|
.first; |
|
|
|
if (server_it->second.channel_state == nullptr) { |
|
|
|
if (server_it->second.channel_state == nullptr) { |
|
|
|
server_it->second.channel_state = GetOrCreateChannelStateLocked( |
|
|
|
server_it->second.channel_state = GetOrCreateChannelStateLocked( |
|
|
|
*server, "load report map (locality stats)"); |
|
|
|
xds_server, "load report map (locality stats)"); |
|
|
|
} |
|
|
|
} |
|
|
|
auto load_report_it = server_it->second.load_report_map |
|
|
|
auto load_report_it = server_it->second.load_report_map |
|
|
|
.emplace(std::move(key), LoadReportState()) |
|
|
|
.emplace(std::move(key), LoadReportState()) |
|
|
@ -1736,7 +1733,7 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats( |
|
|
|
locality_state.locality_stats->GetSnapshotAndReset(); |
|
|
|
locality_state.locality_stats->GetSnapshotAndReset(); |
|
|
|
} |
|
|
|
} |
|
|
|
cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>( |
|
|
|
cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>( |
|
|
|
Ref(DEBUG_LOCATION, "LocalityStats"), *server, |
|
|
|
Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first, |
|
|
|
load_report_it->first.first /*cluster_name*/, |
|
|
|
load_report_it->first.first /*cluster_name*/, |
|
|
|
load_report_it->first.second /*eds_service_name*/, |
|
|
|
load_report_it->first.second /*eds_service_name*/, |
|
|
|
std::move(locality)); |
|
|
|
std::move(locality)); |
|
|
@ -1753,10 +1750,8 @@ void XdsClient::RemoveClusterLocalityStats( |
|
|
|
absl::string_view eds_service_name, |
|
|
|
absl::string_view eds_service_name, |
|
|
|
const RefCountedPtr<XdsLocalityName>& locality, |
|
|
|
const RefCountedPtr<XdsLocalityName>& locality, |
|
|
|
XdsClusterLocalityStats* cluster_locality_stats) { |
|
|
|
XdsClusterLocalityStats* cluster_locality_stats) { |
|
|
|
const auto* server = bootstrap_->FindXdsServer(xds_server); |
|
|
|
|
|
|
|
if (server == nullptr) return; |
|
|
|
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
MutexLock lock(&mu_); |
|
|
|
auto server_it = xds_load_report_server_map_.find(server); |
|
|
|
auto server_it = xds_load_report_server_map_.find(xds_server); |
|
|
|
if (server_it == xds_load_report_server_map_.end()) return; |
|
|
|
if (server_it == xds_load_report_server_map_.end()) return; |
|
|
|
auto load_report_it = server_it->second.load_report_map.find( |
|
|
|
auto load_report_it = server_it->second.load_report_map.find( |
|
|
|
std::make_pair(std::string(cluster_name), std::string(eds_service_name))); |
|
|
|
std::make_pair(std::string(cluster_name), std::string(eds_service_name))); |
|
|
@ -1785,8 +1780,8 @@ void XdsClient::NotifyOnErrorLocked(absl::Status status) { |
|
|
|
const auto* node = bootstrap_->node(); |
|
|
|
const auto* node = bootstrap_->node(); |
|
|
|
if (node != nullptr) { |
|
|
|
if (node != nullptr) { |
|
|
|
status = absl::Status( |
|
|
|
status = absl::Status( |
|
|
|
status.code(), |
|
|
|
status.code(), absl::StrCat(status.message(), |
|
|
|
absl::StrCat(status.message(), " (node ID:", node->id(), ")")); |
|
|
|
" (node ID:", bootstrap_->node()->id, ")")); |
|
|
|
} |
|
|
|
} |
|
|
|
std::set<RefCountedPtr<ResourceWatcherInterface>> watchers; |
|
|
|
std::set<RefCountedPtr<ResourceWatcherInterface>> watchers; |
|
|
|
for (const auto& a : authority_state_map_) { // authority
|
|
|
|
for (const auto& a : authority_state_map_) { // authority
|
|
|
@ -1816,8 +1811,8 @@ void XdsClient::NotifyWatchersOnErrorLocked( |
|
|
|
const auto* node = bootstrap_->node(); |
|
|
|
const auto* node = bootstrap_->node(); |
|
|
|
if (node != nullptr) { |
|
|
|
if (node != nullptr) { |
|
|
|
status = absl::Status( |
|
|
|
status = absl::Status( |
|
|
|
status.code(), |
|
|
|
status.code(), absl::StrCat(status.message(), |
|
|
|
absl::StrCat(status.message(), " (node ID:", node->id(), ")")); |
|
|
|
" (node ID:", bootstrap_->node()->id, ")")); |
|
|
|
} |
|
|
|
} |
|
|
|
work_serializer_.Schedule( |
|
|
|
work_serializer_.Schedule( |
|
|
|
[watchers, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { |
|
|
|
[watchers, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { |
|
|
@ -1847,7 +1842,7 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] start building load report", this); |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] start building load report", this); |
|
|
|
} |
|
|
|
} |
|
|
|
XdsApi::ClusterLoadReportMap snapshot_map; |
|
|
|
XdsApi::ClusterLoadReportMap snapshot_map; |
|
|
|
auto server_it = xds_load_report_server_map_.find(&xds_server); |
|
|
|
auto server_it = xds_load_report_server_map_.find(xds_server); |
|
|
|
if (server_it == xds_load_report_server_map_.end()) return snapshot_map; |
|
|
|
if (server_it == xds_load_report_server_map_.end()) return snapshot_map; |
|
|
|
auto& load_report_map = server_it->second.load_report_map; |
|
|
|
auto& load_report_map = server_it->second.load_report_map; |
|
|
|
for (auto load_report_it = load_report_map.begin(); |
|
|
|
for (auto load_report_it = load_report_map.begin(); |
|
|
|