|
|
@ -136,10 +136,10 @@ class XdsClient::ChannelState::AdsCallState |
|
|
|
bool seen_response() const { return seen_response_; } |
|
|
|
bool seen_response() const { return seen_response_; } |
|
|
|
|
|
|
|
|
|
|
|
void SubscribeLocked(const std::string& type_url, |
|
|
|
void SubscribeLocked(const std::string& type_url, |
|
|
|
const XdsApi::ResourceName& resource) |
|
|
|
const XdsApi::ResourceName& name) |
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); |
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); |
|
|
|
void UnsubscribeLocked(const std::string& type_url, |
|
|
|
void UnsubscribeLocked(const std::string& type_url, |
|
|
|
const XdsApi::ResourceName& resource, |
|
|
|
const XdsApi::ResourceName& name, |
|
|
|
bool delay_unsubscription) |
|
|
|
bool delay_unsubscription) |
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); |
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); |
|
|
|
|
|
|
|
|
|
|
@ -148,24 +148,20 @@ class XdsClient::ChannelState::AdsCallState |
|
|
|
private: |
|
|
|
private: |
|
|
|
class ResourceState : public InternallyRefCounted<ResourceState> { |
|
|
|
class ResourceState : public InternallyRefCounted<ResourceState> { |
|
|
|
public: |
|
|
|
public: |
|
|
|
ResourceState(const std::string& type_url, |
|
|
|
ResourceState(const std::string& type_url, const XdsApi::ResourceName& name) |
|
|
|
const XdsApi::ResourceName& resource, |
|
|
|
: type_url_(type_url), name_(name) { |
|
|
|
bool sent_initial_request) |
|
|
|
|
|
|
|
: type_url_(type_url), |
|
|
|
|
|
|
|
resource_(resource), |
|
|
|
|
|
|
|
sent_initial_request_(sent_initial_request) { |
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this, |
|
|
|
GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this, |
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void Orphan() override { |
|
|
|
void Orphan() override { |
|
|
|
Finish(); |
|
|
|
MaybeCancelTimer(); |
|
|
|
Unref(DEBUG_LOCATION, "Orphan"); |
|
|
|
Unref(DEBUG_LOCATION, "Orphan"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void Start(RefCountedPtr<AdsCallState> ads_calld) { |
|
|
|
void MaybeStartTimer(RefCountedPtr<AdsCallState> ads_calld) { |
|
|
|
if (sent_initial_request_) return; |
|
|
|
if (timer_started_) return; |
|
|
|
sent_initial_request_ = true; |
|
|
|
timer_started_ = true; |
|
|
|
ads_calld_ = std::move(ads_calld); |
|
|
|
ads_calld_ = std::move(ads_calld); |
|
|
|
Ref(DEBUG_LOCATION, "timer").release(); |
|
|
|
Ref(DEBUG_LOCATION, "timer").release(); |
|
|
|
timer_pending_ = true; |
|
|
|
timer_pending_ = true; |
|
|
@ -175,7 +171,7 @@ class XdsClient::ChannelState::AdsCallState |
|
|
|
&timer_callback_); |
|
|
|
&timer_callback_); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void Finish() { |
|
|
|
void MaybeCancelTimer() { |
|
|
|
if (timer_pending_) { |
|
|
|
if (timer_pending_) { |
|
|
|
grpc_timer_cancel(&timer_); |
|
|
|
grpc_timer_cancel(&timer_); |
|
|
|
timer_pending_ = false; |
|
|
|
timer_pending_ = false; |
|
|
@ -201,8 +197,8 @@ class XdsClient::ChannelState::AdsCallState |
|
|
|
GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat( |
|
|
|
GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat( |
|
|
|
"timeout obtaining resource {type=%s name=%s} from xds server", |
|
|
|
"timeout obtaining resource {type=%s name=%s} from xds server", |
|
|
|
type_url_, |
|
|
|
type_url_, |
|
|
|
XdsApi::ConstructFullResourceName(resource_.authority, |
|
|
|
XdsApi::ConstructFullResourceName(name_.authority, type_url_, |
|
|
|
type_url_, resource_.id))); |
|
|
|
name_.id))); |
|
|
|
watcher_error = grpc_error_set_int( |
|
|
|
watcher_error = grpc_error_set_int( |
|
|
|
watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); |
|
|
|
watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
@ -210,28 +206,27 @@ class XdsClient::ChannelState::AdsCallState |
|
|
|
grpc_error_std_string(watcher_error).c_str()); |
|
|
|
grpc_error_std_string(watcher_error).c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
auto& authority_state = |
|
|
|
auto& authority_state = |
|
|
|
ads_calld_->xds_client()->authority_state_map_[resource_.authority]; |
|
|
|
ads_calld_->xds_client()->authority_state_map_[name_.authority]; |
|
|
|
if (type_url_ == XdsApi::kLdsTypeUrl) { |
|
|
|
if (type_url_ == XdsApi::kLdsTypeUrl) { |
|
|
|
ListenerState& state = authority_state.listener_map[resource_.id]; |
|
|
|
ListenerState& state = authority_state.listener_map[name_.id]; |
|
|
|
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; |
|
|
|
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; |
|
|
|
for (const auto& p : state.watchers) { |
|
|
|
for (const auto& p : state.watchers) { |
|
|
|
p.first->OnError(GRPC_ERROR_REF(watcher_error)); |
|
|
|
p.first->OnError(GRPC_ERROR_REF(watcher_error)); |
|
|
|
} |
|
|
|
} |
|
|
|
} else if (type_url_ == XdsApi::kRdsTypeUrl) { |
|
|
|
} else if (type_url_ == XdsApi::kRdsTypeUrl) { |
|
|
|
RouteConfigState& state = |
|
|
|
RouteConfigState& state = authority_state.route_config_map[name_.id]; |
|
|
|
authority_state.route_config_map[resource_.id]; |
|
|
|
|
|
|
|
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; |
|
|
|
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; |
|
|
|
for (const auto& p : state.watchers) { |
|
|
|
for (const auto& p : state.watchers) { |
|
|
|
p.first->OnError(GRPC_ERROR_REF(watcher_error)); |
|
|
|
p.first->OnError(GRPC_ERROR_REF(watcher_error)); |
|
|
|
} |
|
|
|
} |
|
|
|
} else if (type_url_ == XdsApi::kCdsTypeUrl) { |
|
|
|
} else if (type_url_ == XdsApi::kCdsTypeUrl) { |
|
|
|
ClusterState& state = authority_state.cluster_map[resource_.id]; |
|
|
|
ClusterState& state = authority_state.cluster_map[name_.id]; |
|
|
|
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; |
|
|
|
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; |
|
|
|
for (const auto& p : state.watchers) { |
|
|
|
for (const auto& p : state.watchers) { |
|
|
|
p.first->OnError(GRPC_ERROR_REF(watcher_error)); |
|
|
|
p.first->OnError(GRPC_ERROR_REF(watcher_error)); |
|
|
|
} |
|
|
|
} |
|
|
|
} else if (type_url_ == XdsApi::kEdsTypeUrl) { |
|
|
|
} else if (type_url_ == XdsApi::kEdsTypeUrl) { |
|
|
|
EndpointState& state = authority_state.endpoint_map[resource_.id]; |
|
|
|
EndpointState& state = authority_state.endpoint_map[name_.id]; |
|
|
|
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; |
|
|
|
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; |
|
|
|
for (const auto& p : state.watchers) { |
|
|
|
for (const auto& p : state.watchers) { |
|
|
|
p.first->OnError(GRPC_ERROR_REF(watcher_error)); |
|
|
|
p.first->OnError(GRPC_ERROR_REF(watcher_error)); |
|
|
@ -245,10 +240,10 @@ class XdsClient::ChannelState::AdsCallState |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
const std::string type_url_; |
|
|
|
const std::string type_url_; |
|
|
|
const XdsApi::ResourceName resource_; |
|
|
|
const XdsApi::ResourceName name_; |
|
|
|
|
|
|
|
|
|
|
|
RefCountedPtr<AdsCallState> ads_calld_; |
|
|
|
RefCountedPtr<AdsCallState> ads_calld_; |
|
|
|
bool sent_initial_request_; |
|
|
|
bool timer_started_ = false; |
|
|
|
bool timer_pending_ = false; |
|
|
|
bool timer_pending_ = false; |
|
|
|
grpc_timer timer_; |
|
|
|
grpc_timer timer_; |
|
|
|
grpc_closure timer_callback_; |
|
|
|
grpc_closure timer_callback_; |
|
|
@ -577,7 +572,7 @@ void XdsClient::ChannelState::CancelConnectivityWatchLocked() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::SubscribeLocked( |
|
|
|
void XdsClient::ChannelState::SubscribeLocked( |
|
|
|
const std::string& type_url, const XdsApi::ResourceName& resource) { |
|
|
|
const std::string& type_url, const XdsApi::ResourceName& name) { |
|
|
|
if (ads_calld_ == nullptr) { |
|
|
|
if (ads_calld_ == nullptr) { |
|
|
|
// Start the ADS call if this is the first request.
|
|
|
|
// Start the ADS call if this is the first request.
|
|
|
|
ads_calld_.reset(new RetryableCall<AdsCallState>( |
|
|
|
ads_calld_.reset(new RetryableCall<AdsCallState>( |
|
|
@ -591,16 +586,16 @@ void XdsClient::ChannelState::SubscribeLocked( |
|
|
|
// because when the call is restarted it will resend all necessary requests.
|
|
|
|
// because when the call is restarted it will resend all necessary requests.
|
|
|
|
if (ads_calld() == nullptr) return; |
|
|
|
if (ads_calld() == nullptr) return; |
|
|
|
// Subscribe to this resource if the ADS call is active.
|
|
|
|
// Subscribe to this resource if the ADS call is active.
|
|
|
|
ads_calld()->SubscribeLocked(type_url, resource); |
|
|
|
ads_calld()->SubscribeLocked(type_url, name); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::UnsubscribeLocked( |
|
|
|
void XdsClient::ChannelState::UnsubscribeLocked( |
|
|
|
const std::string& type_url, const XdsApi::ResourceName& resource, |
|
|
|
const std::string& type_url, const XdsApi::ResourceName& name, |
|
|
|
bool delay_unsubscription) { |
|
|
|
bool delay_unsubscription) { |
|
|
|
if (ads_calld_ != nullptr) { |
|
|
|
if (ads_calld_ != nullptr) { |
|
|
|
auto* calld = ads_calld_->calld(); |
|
|
|
auto* calld = ads_calld_->calld(); |
|
|
|
if (calld != nullptr) { |
|
|
|
if (calld != nullptr) { |
|
|
|
calld->UnsubscribeLocked(type_url, resource, delay_unsubscription); |
|
|
|
calld->UnsubscribeLocked(type_url, name, delay_unsubscription); |
|
|
|
if (!calld->HasSubscribedResources()) { |
|
|
|
if (!calld->HasSubscribedResources()) { |
|
|
|
ads_calld_.reset(); |
|
|
|
ads_calld_.reset(); |
|
|
|
} |
|
|
|
} |
|
|
@ -901,25 +896,23 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::AdsCallState::SubscribeLocked( |
|
|
|
void XdsClient::ChannelState::AdsCallState::SubscribeLocked( |
|
|
|
const std::string& type_url, const XdsApi::ResourceName& resource) { |
|
|
|
const std::string& type_url, const XdsApi::ResourceName& name) { |
|
|
|
auto& state = state_map_[type_url] |
|
|
|
auto& state = |
|
|
|
.subscribed_resources[resource.authority][resource.id]; |
|
|
|
state_map_[type_url].subscribed_resources[name.authority][name.id]; |
|
|
|
if (state == nullptr) { |
|
|
|
if (state == nullptr) { |
|
|
|
state = MakeOrphanable<ResourceState>( |
|
|
|
state = MakeOrphanable<ResourceState>(type_url, name); |
|
|
|
type_url, resource, |
|
|
|
|
|
|
|
!chand()->resource_type_version_map_[type_url].empty()); |
|
|
|
|
|
|
|
SendMessageLocked(type_url); |
|
|
|
SendMessageLocked(type_url); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked( |
|
|
|
void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked( |
|
|
|
const std::string& type_url, const XdsApi::ResourceName& resource, |
|
|
|
const std::string& type_url, const XdsApi::ResourceName& name, |
|
|
|
bool delay_unsubscription) { |
|
|
|
bool delay_unsubscription) { |
|
|
|
auto& type_state_map = state_map_[type_url]; |
|
|
|
auto& type_state_map = state_map_[type_url]; |
|
|
|
auto& authority_map = type_state_map.subscribed_resources[resource.authority]; |
|
|
|
auto& authority_map = type_state_map.subscribed_resources[name.authority]; |
|
|
|
authority_map.erase(resource.id); |
|
|
|
authority_map.erase(name.id); |
|
|
|
if (authority_map.empty()) { |
|
|
|
if (authority_map.empty()) { |
|
|
|
type_state_map.subscribed_resources.erase(resource.authority); |
|
|
|
type_state_map.subscribed_resources.erase(name.authority); |
|
|
|
} |
|
|
|
} |
|
|
|
if (!delay_unsubscription) SendMessageLocked(type_url); |
|
|
|
if (!delay_unsubscription) SendMessageLocked(type_url); |
|
|
|
} |
|
|
|
} |
|
|
@ -960,15 +953,19 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( |
|
|
|
auto& lds_state = state_map_[XdsApi::kLdsTypeUrl]; |
|
|
|
auto& lds_state = state_map_[XdsApi::kLdsTypeUrl]; |
|
|
|
std::set<std::string> rds_resource_names_seen; |
|
|
|
std::set<std::string> rds_resource_names_seen; |
|
|
|
for (auto& p : lds_update_map) { |
|
|
|
for (auto& p : lds_update_map) { |
|
|
|
const XdsApi::ResourceName& resource = p.first; |
|
|
|
const XdsApi::ResourceName& name = p.first; |
|
|
|
XdsApi::LdsUpdate& lds_update = p.second.resource; |
|
|
|
XdsApi::LdsUpdate& lds_update = p.second.resource; |
|
|
|
auto& state = |
|
|
|
auto it = lds_state.subscribed_resources.find(name.authority); |
|
|
|
lds_state.subscribed_resources[resource.authority][resource.id]; |
|
|
|
if (it != lds_state.subscribed_resources.end()) { |
|
|
|
if (state != nullptr) state->Finish(); |
|
|
|
auto res_it = it->second.find(name.id); |
|
|
|
|
|
|
|
if (res_it != it->second.end()) { |
|
|
|
|
|
|
|
res_it->second->MaybeCancelTimer(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(), |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(), |
|
|
|
XdsApi::ConstructFullResourceName( |
|
|
|
XdsApi::ConstructFullResourceName(name.authority, |
|
|
|
resource.authority, XdsApi::kLdsTypeUrl, resource.id) |
|
|
|
XdsApi::kLdsTypeUrl, name.id) |
|
|
|
.c_str(), |
|
|
|
.c_str(), |
|
|
|
lds_update.ToString().c_str()); |
|
|
|
lds_update.ToString().c_str()); |
|
|
|
} |
|
|
|
} |
|
|
@ -977,10 +974,9 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( |
|
|
|
rds_resource_names_seen.insert( |
|
|
|
rds_resource_names_seen.insert( |
|
|
|
lds_update.http_connection_manager.route_config_name); |
|
|
|
lds_update.http_connection_manager.route_config_name); |
|
|
|
} |
|
|
|
} |
|
|
|
ListenerState& listener_state = |
|
|
|
ListenerState& listener_state = xds_client() |
|
|
|
xds_client() |
|
|
|
->authority_state_map_[name.authority] |
|
|
|
->authority_state_map_[resource.authority] |
|
|
|
.listener_map[name.id]; |
|
|
|
.listener_map[resource.id]; |
|
|
|
|
|
|
|
// Ignore identical update.
|
|
|
|
// Ignore identical update.
|
|
|
|
if (listener_state.update.has_value() && |
|
|
|
if (listener_state.update.has_value() && |
|
|
|
*listener_state.update == lds_update) { |
|
|
|
*listener_state.update == lds_update) { |
|
|
@ -989,8 +985,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( |
|
|
|
"[xds_client %p] LDS update for %s identical to current, " |
|
|
|
"[xds_client %p] LDS update for %s identical to current, " |
|
|
|
"ignoring.", |
|
|
|
"ignoring.", |
|
|
|
xds_client(), |
|
|
|
xds_client(), |
|
|
|
XdsApi::ConstructFullResourceName( |
|
|
|
XdsApi::ConstructFullResourceName(name.authority, |
|
|
|
resource.authority, XdsApi::kLdsTypeUrl, resource.id) |
|
|
|
XdsApi::kLdsTypeUrl, name.id) |
|
|
|
.c_str()); |
|
|
|
.c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
continue; |
|
|
|
continue; |
|
|
@ -1007,14 +1003,14 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( |
|
|
|
// For invalid resources in the update, if they are already in the
|
|
|
|
// For invalid resources in the update, if they are already in the
|
|
|
|
// cache, pretend that they are present in the update, so that we
|
|
|
|
// cache, pretend that they are present in the update, so that we
|
|
|
|
// don't incorrectly consider them deleted below.
|
|
|
|
// don't incorrectly consider them deleted below.
|
|
|
|
for (const auto& resource : resource_names_failed) { |
|
|
|
for (const auto& name : resource_names_failed) { |
|
|
|
auto& listener_map = |
|
|
|
auto& listener_map = |
|
|
|
xds_client()->authority_state_map_[resource.authority].listener_map; |
|
|
|
xds_client()->authority_state_map_[name.authority].listener_map; |
|
|
|
auto it = listener_map.find(resource.id); |
|
|
|
auto it = listener_map.find(name.id); |
|
|
|
if (it != listener_map.end()) { |
|
|
|
if (it != listener_map.end()) { |
|
|
|
auto& update = it->second.update; |
|
|
|
auto& update = it->second.update; |
|
|
|
if (!update.has_value()) continue; |
|
|
|
if (!update.has_value()) continue; |
|
|
|
lds_update_map[resource]; |
|
|
|
lds_update_map[name]; |
|
|
|
if (!update->http_connection_manager.route_config_name.empty()) { |
|
|
|
if (!update->http_connection_manager.route_config_name.empty()) { |
|
|
|
rds_resource_names_seen.insert( |
|
|
|
rds_resource_names_seen.insert( |
|
|
|
update->http_connection_manager.route_config_name); |
|
|
|
update->http_connection_manager.route_config_name); |
|
|
@ -1082,19 +1078,23 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked( |
|
|
|
} |
|
|
|
} |
|
|
|
auto& rds_state = state_map_[XdsApi::kRdsTypeUrl]; |
|
|
|
auto& rds_state = state_map_[XdsApi::kRdsTypeUrl]; |
|
|
|
for (auto& p : rds_update_map) { |
|
|
|
for (auto& p : rds_update_map) { |
|
|
|
const XdsApi::ResourceName& resource = p.first; |
|
|
|
const XdsApi::ResourceName& name = p.first; |
|
|
|
XdsApi::RdsUpdate& rds_update = p.second.resource; |
|
|
|
XdsApi::RdsUpdate& rds_update = p.second.resource; |
|
|
|
auto& state = |
|
|
|
auto it = rds_state.subscribed_resources.find(name.authority); |
|
|
|
rds_state.subscribed_resources[resource.authority][resource.id]; |
|
|
|
if (it != rds_state.subscribed_resources.end()) { |
|
|
|
if (state != nullptr) state->Finish(); |
|
|
|
auto res_it = it->second.find(name.id); |
|
|
|
|
|
|
|
if (res_it != it->second.end()) { |
|
|
|
|
|
|
|
res_it->second->MaybeCancelTimer(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(), |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(), |
|
|
|
rds_update.ToString().c_str()); |
|
|
|
rds_update.ToString().c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
RouteConfigState& route_config_state = |
|
|
|
RouteConfigState& route_config_state = |
|
|
|
xds_client() |
|
|
|
xds_client() |
|
|
|
->authority_state_map_[resource.authority] |
|
|
|
->authority_state_map_[name.authority] |
|
|
|
.route_config_map[resource.id]; |
|
|
|
.route_config_map[name.id]; |
|
|
|
// Ignore identical update.
|
|
|
|
// Ignore identical update.
|
|
|
|
if (route_config_state.update.has_value() && |
|
|
|
if (route_config_state.update.has_value() && |
|
|
|
*route_config_state.update == rds_update) { |
|
|
|
*route_config_state.update == rds_update) { |
|
|
@ -1129,27 +1129,30 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( |
|
|
|
auto& cds_state = state_map_[XdsApi::kCdsTypeUrl]; |
|
|
|
auto& cds_state = state_map_[XdsApi::kCdsTypeUrl]; |
|
|
|
std::set<std::string> eds_resource_names_seen; |
|
|
|
std::set<std::string> eds_resource_names_seen; |
|
|
|
for (auto& p : cds_update_map) { |
|
|
|
for (auto& p : cds_update_map) { |
|
|
|
const XdsApi::ResourceName& resource = p.first; |
|
|
|
const XdsApi::ResourceName& name = p.first; |
|
|
|
XdsApi::CdsUpdate& cds_update = p.second.resource; |
|
|
|
XdsApi::CdsUpdate& cds_update = p.second.resource; |
|
|
|
auto& state = |
|
|
|
auto it = cds_state.subscribed_resources.find(name.authority); |
|
|
|
cds_state.subscribed_resources[resource.authority][resource.id]; |
|
|
|
if (it != cds_state.subscribed_resources.end()) { |
|
|
|
if (state != nullptr) state->Finish(); |
|
|
|
auto res_it = it->second.find(name.id); |
|
|
|
|
|
|
|
if (res_it != it->second.end()) { |
|
|
|
|
|
|
|
res_it->second->MaybeCancelTimer(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(), |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(), |
|
|
|
XdsApi::ConstructFullResourceName( |
|
|
|
XdsApi::ConstructFullResourceName(name.authority, |
|
|
|
resource.authority, XdsApi::kCdsTypeUrl, resource.id) |
|
|
|
XdsApi::kCdsTypeUrl, name.id) |
|
|
|
.c_str(), |
|
|
|
.c_str(), |
|
|
|
cds_update.ToString().c_str()); |
|
|
|
cds_update.ToString().c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
// Record the EDS resource names seen.
|
|
|
|
// Record the EDS resource names seen.
|
|
|
|
eds_resource_names_seen.insert( |
|
|
|
eds_resource_names_seen.insert( |
|
|
|
cds_update.eds_service_name.empty() |
|
|
|
cds_update.eds_service_name.empty() |
|
|
|
? XdsApi::ConstructFullResourceName( |
|
|
|
? XdsApi::ConstructFullResourceName(name.authority, |
|
|
|
resource.authority, XdsApi::kCdsTypeUrl, resource.id) |
|
|
|
XdsApi::kCdsTypeUrl, name.id) |
|
|
|
: cds_update.eds_service_name); |
|
|
|
: cds_update.eds_service_name); |
|
|
|
ClusterState& cluster_state = xds_client() |
|
|
|
ClusterState& cluster_state = |
|
|
|
->authority_state_map_[resource.authority] |
|
|
|
xds_client()->authority_state_map_[name.authority].cluster_map[name.id]; |
|
|
|
.cluster_map[resource.id]; |
|
|
|
|
|
|
|
// Ignore identical update.
|
|
|
|
// Ignore identical update.
|
|
|
|
if (cluster_state.update.has_value() && |
|
|
|
if (cluster_state.update.has_value() && |
|
|
|
*cluster_state.update == cds_update) { |
|
|
|
*cluster_state.update == cds_update) { |
|
|
@ -1172,18 +1175,18 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( |
|
|
|
// For invalid resources in the update, if they are already in the
|
|
|
|
// For invalid resources in the update, if they are already in the
|
|
|
|
// cache, pretend that they are present in the update, so that we
|
|
|
|
// cache, pretend that they are present in the update, so that we
|
|
|
|
// don't incorrectly consider them deleted below.
|
|
|
|
// don't incorrectly consider them deleted below.
|
|
|
|
for (const auto& resource : resource_names_failed) { |
|
|
|
for (const auto& name : resource_names_failed) { |
|
|
|
auto& cluster_map = |
|
|
|
auto& cluster_map = |
|
|
|
xds_client()->authority_state_map_[resource.authority].cluster_map; |
|
|
|
xds_client()->authority_state_map_[name.authority].cluster_map; |
|
|
|
auto it = cluster_map.find(resource.id); |
|
|
|
auto it = cluster_map.find(name.id); |
|
|
|
if (it != cluster_map.end()) { |
|
|
|
if (it != cluster_map.end()) { |
|
|
|
auto& update = it->second.update; |
|
|
|
auto& update = it->second.update; |
|
|
|
if (!update.has_value()) continue; |
|
|
|
if (!update.has_value()) continue; |
|
|
|
cds_update_map[resource]; |
|
|
|
cds_update_map[name]; |
|
|
|
eds_resource_names_seen.insert( |
|
|
|
eds_resource_names_seen.insert( |
|
|
|
update->eds_service_name.empty() |
|
|
|
update->eds_service_name.empty() |
|
|
|
? XdsApi::ConstructFullResourceName( |
|
|
|
? XdsApi::ConstructFullResourceName(name.authority, |
|
|
|
resource.authority, XdsApi::kCdsTypeUrl, resource.id) |
|
|
|
XdsApi::kCdsTypeUrl, name.id) |
|
|
|
: update->eds_service_name); |
|
|
|
: update->eds_service_name); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -1246,22 +1249,25 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked( |
|
|
|
} |
|
|
|
} |
|
|
|
auto& eds_state = state_map_[XdsApi::kEdsTypeUrl]; |
|
|
|
auto& eds_state = state_map_[XdsApi::kEdsTypeUrl]; |
|
|
|
for (auto& p : eds_update_map) { |
|
|
|
for (auto& p : eds_update_map) { |
|
|
|
const XdsApi::ResourceName& resource = p.first; |
|
|
|
const XdsApi::ResourceName& name = p.first; |
|
|
|
XdsApi::EdsUpdate& eds_update = p.second.resource; |
|
|
|
XdsApi::EdsUpdate& eds_update = p.second.resource; |
|
|
|
auto& state = |
|
|
|
auto it = eds_state.subscribed_resources.find(name.authority); |
|
|
|
eds_state.subscribed_resources[resource.authority][resource.id]; |
|
|
|
if (it != eds_state.subscribed_resources.end()) { |
|
|
|
if (state != nullptr) state->Finish(); |
|
|
|
auto res_it = it->second.find(name.id); |
|
|
|
|
|
|
|
if (res_it != it->second.end()) { |
|
|
|
|
|
|
|
res_it->second->MaybeCancelTimer(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(), |
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(), |
|
|
|
XdsApi::ConstructFullResourceName( |
|
|
|
XdsApi::ConstructFullResourceName(name.authority, |
|
|
|
resource.authority, XdsApi::kCdsTypeUrl, resource.id) |
|
|
|
XdsApi::kCdsTypeUrl, name.id) |
|
|
|
.c_str(), |
|
|
|
.c_str(), |
|
|
|
eds_update.ToString().c_str()); |
|
|
|
eds_update.ToString().c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
EndpointState& endpoint_state = |
|
|
|
EndpointState& endpoint_state = xds_client() |
|
|
|
xds_client() |
|
|
|
->authority_state_map_[name.authority] |
|
|
|
->authority_state_map_[resource.authority] |
|
|
|
.endpoint_map[name.id]; |
|
|
|
.endpoint_map[resource.id]; |
|
|
|
|
|
|
|
// Ignore identical update.
|
|
|
|
// Ignore identical update.
|
|
|
|
if (endpoint_state.update.has_value() && |
|
|
|
if (endpoint_state.update.has_value() && |
|
|
|
*endpoint_state.update == eds_update) { |
|
|
|
*endpoint_state.update == eds_update) { |
|
|
@ -1538,7 +1544,7 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( |
|
|
|
for (auto& p : a.second) { |
|
|
|
for (auto& p : a.second) { |
|
|
|
resource_map[a.first].insert(p.first); |
|
|
|
resource_map[a.first].insert(p.first); |
|
|
|
OrphanablePtr<ResourceState>& state = p.second; |
|
|
|
OrphanablePtr<ResourceState>& state = p.second; |
|
|
|
state->Start(Ref(DEBUG_LOCATION, "ResourceState")); |
|
|
|
state->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceState")); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|