diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 9978b55cb4d..e5762d62f11 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -29,6 +29,8 @@ #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/error_utils.h" namespace grpc_core { @@ -66,11 +68,32 @@ class CdsLb : public LoadBalancingPolicy { public: explicit ClusterWatcher(RefCountedPtr parent) : parent_(std::move(parent)) {} - void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override; - void OnError(grpc_error* error) override; - void OnResourceDoesNotExist() override; + + void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override { + new Notifier(parent_, std::move(cluster_data)); + } + void OnError(grpc_error* error) override { new Notifier(parent_, error); } + void OnResourceDoesNotExist() override { new Notifier(parent_); } private: + class Notifier { + public: + Notifier(RefCountedPtr parent, XdsApi::CdsUpdate update); + Notifier(RefCountedPtr parent, grpc_error* error); + explicit Notifier(RefCountedPtr parent); + + private: + enum Type { kUpdate, kError, kDoesNotExist }; + + static void RunInExecCtx(void* arg, grpc_error* error); + void RunInWorkSerializer(grpc_error* error); + + RefCountedPtr parent_; + grpc_closure closure_; + XdsApi::CdsUpdate update_; + Type type_; + }; + RefCountedPtr parent_; }; @@ -94,6 +117,10 @@ class CdsLb : public LoadBalancingPolicy { void ShutdownLocked() override; + void OnClusterChanged(XdsApi::CdsUpdate cluster_data); + void OnError(grpc_error* error); + void OnResourceDoesNotExist(); + void MaybeDestroyChildPolicyLocked(); RefCountedPtr config_; @@ -115,123 +142,50 @@ class CdsLb : public LoadBalancingPolicy { }; // -// CdsLb::ClusterWatcher +// CdsLb::ClusterWatcher::Notifier // -void CdsLb::ClusterWatcher::OnClusterChanged(XdsApi::CdsUpdate cluster_data) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - gpr_log(GPR_INFO, - "[cdslb %p] received CDS update from xds client %p: " - "eds_service_name=%s lrs_load_reporting_server_name=%s", - parent_.get(), parent_->xds_client_.get(), - cluster_data.eds_service_name.c_str(), - cluster_data.lrs_load_reporting_server_name.has_value() - ? cluster_data.lrs_load_reporting_server_name.value().c_str() - : "(unset)"); - } - // Construct config for child policy. - Json::Object child_config = { - {"clusterName", parent_->config_->cluster()}, - {"localityPickingPolicy", - Json::Array{ - Json::Object{ - {"weighted_target_experimental", - Json::Object{ - {"targets", Json::Object()}, - }}, - }, - }}, - {"endpointPickingPolicy", - Json::Array{ - Json::Object{ - {"round_robin", Json::Object()}, - }, - }}, - }; - if (!cluster_data.eds_service_name.empty()) { - child_config["edsServiceName"] = cluster_data.eds_service_name; - } - if (cluster_data.lrs_load_reporting_server_name.has_value()) { - child_config["lrsLoadReportingServerName"] = - cluster_data.lrs_load_reporting_server_name.value(); - } - Json json = Json::Array{ - Json::Object{ - {"eds_experimental", std::move(child_config)}, - }, - }; - if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - std::string json_str = json.Dump(/*indent=*/1); - gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", - parent_.get(), json_str.c_str()); - } - grpc_error* error = GRPC_ERROR_NONE; - RefCountedPtr config = - LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error); - if (error != GRPC_ERROR_NONE) { - OnError(error); - return; - } - // Create child policy if not already present. - if (parent_->child_policy_ == nullptr) { - LoadBalancingPolicy::Args args; - args.work_serializer = parent_->work_serializer(); - args.args = parent_->args_; - args.channel_control_helper = absl::make_unique(parent_->Ref()); - parent_->child_policy_ = - LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(config->name(), - std::move(args)); - if (parent_->child_policy_ == nullptr) { - OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "failed to create child policy")); - return; - } - grpc_pollset_set_add_pollset_set( - parent_->child_policy_->interested_parties(), - parent_->interested_parties()); - if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", - parent_.get(), config->name(), parent_->child_policy_.get()); - } - } - // Update child policy. - UpdateArgs args; - args.config = std::move(config); - args.args = grpc_channel_args_copy(parent_->args_); - parent_->child_policy_->UpdateLocked(std::move(args)); +CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr parent, + XdsApi::CdsUpdate update) + : parent_(std::move(parent)), update_(std::move(update)), type_(kUpdate) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } -void CdsLb::ClusterWatcher::OnError(grpc_error* error) { - gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s", - parent_.get(), parent_->config_->cluster().c_str(), - grpc_error_string(error)); - // Go into TRANSIENT_FAILURE if we have not yet created the child - // policy (i.e., we have not yet received data from xds). Otherwise, - // we keep running with the data we had previously. - if (parent_->child_policy_ == nullptr) { - parent_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), - absl::make_unique(error)); - } else { - GRPC_ERROR_UNREF(error); - } +CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr parent, + grpc_error* error) + : parent_(std::move(parent)), type_(kError) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, error); } -void CdsLb::ClusterWatcher::OnResourceDoesNotExist() { - gpr_log(GPR_ERROR, - "[cdslb %p] CDS resource for %s does not exist -- reporting " - "TRANSIENT_FAILURE", - parent_.get(), parent_->config_->cluster().c_str()); - grpc_error* error = grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("CDS resource \"", parent_->config_->cluster(), - "\" does not exist") - .c_str()), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - parent_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), - absl::make_unique(error)); - parent_->MaybeDestroyChildPolicyLocked(); +CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr parent) + : parent_(std::move(parent)), type_(kDoesNotExist) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); +} + +void CdsLb::ClusterWatcher::Notifier::RunInExecCtx(void* arg, + grpc_error* error) { + Notifier* self = static_cast(arg); + GRPC_ERROR_REF(error); + self->parent_->work_serializer()->Run( + [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION); +} + +void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer(grpc_error* error) { + switch (type_) { + case kUpdate: + parent_->OnClusterChanged(std::move(update_)); + break; + case kError: + parent_->OnError(error); + break; + case kDoesNotExist: + parent_->OnResourceDoesNotExist(); + break; + }; + delete this; } // @@ -356,6 +310,118 @@ void CdsLb::UpdateLocked(UpdateArgs args) { } } +void CdsLb::OnClusterChanged(XdsApi::CdsUpdate cluster_data) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, + "[cdslb %p] received CDS update from xds client %p: " + "eds_service_name=%s lrs_load_reporting_server_name=%s", + this, xds_client_.get(), cluster_data.eds_service_name.c_str(), + cluster_data.lrs_load_reporting_server_name.has_value() + ? cluster_data.lrs_load_reporting_server_name.value().c_str() + : "(unset)"); + } + // Construct config for child policy. + Json::Object child_config = { + {"clusterName", config_->cluster()}, + {"localityPickingPolicy", + Json::Array{ + Json::Object{ + {"weighted_target_experimental", + Json::Object{ + {"targets", Json::Object()}, + }}, + }, + }}, + {"endpointPickingPolicy", + Json::Array{ + Json::Object{ + {"round_robin", Json::Object()}, + }, + }}, + }; + if (!cluster_data.eds_service_name.empty()) { + child_config["edsServiceName"] = cluster_data.eds_service_name; + } + if (cluster_data.lrs_load_reporting_server_name.has_value()) { + child_config["lrsLoadReportingServerName"] = + cluster_data.lrs_load_reporting_server_name.value(); + } + Json json = Json::Array{ + Json::Object{ + {"eds_experimental", std::move(child_config)}, + }, + }; + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + std::string json_str = json.Dump(/*indent=*/1); + gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this, + json_str.c_str()); + } + grpc_error* error = GRPC_ERROR_NONE; + RefCountedPtr config = + LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error); + if (error != GRPC_ERROR_NONE) { + OnError(error); + return; + } + // Create child policy if not already present. + if (child_policy_ == nullptr) { + LoadBalancingPolicy::Args args; + args.work_serializer = work_serializer(); + args.args = args_; + args.channel_control_helper = absl::make_unique(Ref()); + child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + config->name(), std::move(args)); + if (child_policy_ == nullptr) { + OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "failed to create child policy")); + return; + } + grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), + interested_parties()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this, + config->name(), child_policy_.get()); + } + } + // Update child policy. + UpdateArgs args; + args.config = std::move(config); + args.args = grpc_channel_args_copy(args_); + child_policy_->UpdateLocked(std::move(args)); +} + +void CdsLb::OnError(grpc_error* error) { + gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s", + this, config_->cluster().c_str(), grpc_error_string(error)); + // Go into TRANSIENT_FAILURE if we have not yet created the child + // policy (i.e., we have not yet received data from xds). Otherwise, + // we keep running with the data we had previously. + if (child_policy_ == nullptr) { + channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), + absl::make_unique(error)); + } else { + GRPC_ERROR_UNREF(error); + } +} + +void CdsLb::OnResourceDoesNotExist() { + gpr_log(GPR_ERROR, + "[cdslb %p] CDS resource for %s does not exist -- reporting " + "TRANSIENT_FAILURE", + this, config_->cluster().c_str()); + grpc_error* error = + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("CDS resource \"", config_->cluster(), + "\" does not exist") + .c_str()), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), + absl::make_unique(error)); + MaybeDestroyChildPolicyLocked(); +} + // // factory // diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc index dfd9a065b81..a5295b44e66 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc @@ -99,7 +99,37 @@ class EdsLb : public LoadBalancingPolicy { void ResetBackoffLocked() override; private: - class EndpointWatcher; + class EndpointWatcher : public XdsClient::EndpointWatcherInterface { + public: + explicit EndpointWatcher(RefCountedPtr parent) + : parent_(std::move(parent)) {} + void OnEndpointChanged(XdsApi::EdsUpdate update) override { + new Notifier(parent_, std::move(update)); + } + void OnError(grpc_error* error) override { new Notifier(parent_, error); } + void OnResourceDoesNotExist() override { new Notifier(parent_); } + + private: + class Notifier { + public: + Notifier(RefCountedPtr parent, XdsApi::EdsUpdate update); + Notifier(RefCountedPtr parent, grpc_error* error); + explicit Notifier(RefCountedPtr parent); + + private: + enum Type { kUpdate, kError, kDoesNotExist }; + + static void RunInExecCtx(void* arg, grpc_error* error); + void RunInWorkSerializer(grpc_error* error); + + RefCountedPtr parent_; + grpc_closure closure_; + XdsApi::EdsUpdate update_; + Type type_; + }; + + RefCountedPtr parent_; + }; // A simple wrapper for ref-counting a picker from the child policy. class ChildPickerWrapper : public RefCounted { @@ -150,6 +180,10 @@ class EdsLb : public LoadBalancingPolicy { void ShutdownLocked() override; + void OnEndpointChanged(XdsApi::EdsUpdate update); + void OnError(grpc_error* error); + void OnResourceDoesNotExist(); + void MaybeDestroyChildPolicyLocked(); void UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list); @@ -296,81 +330,51 @@ void EdsLb::Helper::AddTraceEvent(TraceSeverity severity, } // -// EdsLb::EndpointWatcher +// EdsLb::EndpointWatcher::Notifier // -class EdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface { - public: - explicit EndpointWatcher(RefCountedPtr eds_policy) - : eds_policy_(std::move(eds_policy)) {} - - ~EndpointWatcher() { eds_policy_.reset(DEBUG_LOCATION, "EndpointWatcher"); } +EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr parent, + XdsApi::EdsUpdate update) + : parent_(std::move(parent)), update_(std::move(update)), type_(kUpdate) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); +} - void OnEndpointChanged(XdsApi::EdsUpdate update) override { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Received EDS update from xds client", - eds_policy_.get()); - } - // Update the drop config. - const bool drop_config_changed = - eds_policy_->drop_config_ == nullptr || - *eds_policy_->drop_config_ != *update.drop_config; - if (drop_config_changed) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Updating drop config", eds_policy_.get()); - } - eds_policy_->drop_config_ = std::move(update.drop_config); - eds_policy_->MaybeUpdateDropPickerLocked(); - } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Drop config unchanged, ignoring", - eds_policy_.get()); - } - // Update priority and locality info. - if (eds_policy_->child_policy_ == nullptr || - eds_policy_->priority_list_ != update.priorities) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Updating priority list", - eds_policy_.get()); - } - eds_policy_->UpdatePriorityList(std::move(update.priorities)); - } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Priority list unchanged, ignoring", - eds_policy_.get()); - } - } +EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr parent, + grpc_error* error) + : parent_(std::move(parent)), type_(kError) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, error); +} - void OnError(grpc_error* error) override { - gpr_log(GPR_ERROR, "[edslb %p] xds watcher reported error: %s", - eds_policy_.get(), grpc_error_string(error)); - // Go into TRANSIENT_FAILURE if we have not yet created the child - // policy (i.e., we have not yet received data from xds). Otherwise, - // we keep running with the data we had previously. - if (eds_policy_->child_policy_ == nullptr) { - eds_policy_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), - absl::make_unique(error)); - } else { - GRPC_ERROR_UNREF(error); - } - } +EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr parent) + : parent_(std::move(parent)), type_(kDoesNotExist) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); +} - void OnResourceDoesNotExist() override { - gpr_log( - GPR_ERROR, - "[edslb %p] EDS resource does not exist -- reporting TRANSIENT_FAILURE", - eds_policy_.get()); - grpc_error* error = grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("EDS resource does not exist"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - eds_policy_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), - absl::make_unique(error)); - eds_policy_->MaybeDestroyChildPolicyLocked(); - } +void EdsLb::EndpointWatcher::Notifier::RunInExecCtx(void* arg, + grpc_error* error) { + Notifier* self = static_cast(arg); + GRPC_ERROR_REF(error); + self->parent_->work_serializer()->Run( + [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION); +} - private: - RefCountedPtr eds_policy_; -}; +void EdsLb::EndpointWatcher::Notifier::RunInWorkSerializer(grpc_error* error) { + switch (type_) { + case kUpdate: + parent_->OnEndpointChanged(std::move(update_)); + break; + case kError: + parent_->OnError(error); + break; + case kDoesNotExist: + parent_->OnResourceDoesNotExist(); + break; + }; + delete this; +} // // EdsLb public methods @@ -399,7 +403,7 @@ EdsLb::EdsLb(Args args) EdsLb::~EdsLb() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] destroying xds LB policy", this); + gpr_log(GPR_INFO, "[edslb %p] destroying eds LB policy", this); } } @@ -461,8 +465,7 @@ void EdsLb::UpdateLocked(UpdateArgs args) { // Initialize XdsClient. if (xds_client_from_channel_ == nullptr) { grpc_error* error = GRPC_ERROR_NONE; - xds_client_ = - MakeOrphanable(work_serializer(), *args_, &error); + xds_client_ = MakeOrphanable(*args_, &error); // TODO(roth): If we decide that we care about EDS-only mode, add // proper error handling here. GPR_ASSERT(error == GRPC_ERROR_NONE); @@ -513,6 +516,62 @@ void EdsLb::ResetBackoffLocked() { } } +void EdsLb::OnEndpointChanged(XdsApi::EdsUpdate update) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { + gpr_log(GPR_INFO, "[edslb %p] Received EDS update from xds client", this); + } + // Update the drop config. + const bool drop_config_changed = + drop_config_ == nullptr || *drop_config_ != *update.drop_config; + if (drop_config_changed) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { + gpr_log(GPR_INFO, "[edslb %p] Updating drop config", this); + } + drop_config_ = std::move(update.drop_config); + MaybeUpdateDropPickerLocked(); + } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { + gpr_log(GPR_INFO, "[edslb %p] Drop config unchanged, ignoring", this); + } + // Update priority and locality info. + if (child_policy_ == nullptr || priority_list_ != update.priorities) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { + gpr_log(GPR_INFO, "[edslb %p] Updating priority list", this); + } + UpdatePriorityList(std::move(update.priorities)); + } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { + gpr_log(GPR_INFO, "[edslb %p] Priority list unchanged, ignoring", this); + } +} + +void EdsLb::OnError(grpc_error* error) { + gpr_log(GPR_ERROR, "[edslb %p] xds watcher reported error: %s", this, + grpc_error_string(error)); + // Go into TRANSIENT_FAILURE if we have not yet created the child + // policy (i.e., we have not yet received data from xds). Otherwise, + // we keep running with the data we had previously. + if (child_policy_ == nullptr) { + channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), + absl::make_unique(error)); + } else { + GRPC_ERROR_UNREF(error); + } +} + +void EdsLb::OnResourceDoesNotExist() { + gpr_log( + GPR_ERROR, + "[edslb %p] EDS resource does not exist -- reporting TRANSIENT_FAILURE", + this); + grpc_error* error = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("EDS resource does not exist"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), + absl::make_unique(error)); + MaybeDestroyChildPolicyLocked(); +} + // // child policy-related methods // diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index 4d3cb3b8fa4..c4cdde64d36 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -27,6 +27,8 @@ #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/xds/xds_client.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/timeout_encoding.h" namespace grpc_core { @@ -69,13 +71,34 @@ class XdsResolver : public Resolver { void ShutdownLocked() override; private: + class Notifier { + public: + Notifier(RefCountedPtr parent, XdsApi::LdsUpdate update); + Notifier(RefCountedPtr parent, XdsApi::RdsUpdate update); + Notifier(RefCountedPtr parent, grpc_error* error); + explicit Notifier(RefCountedPtr parent); + + private: + enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist }; + + static void RunInExecCtx(void* arg, grpc_error* error); + void RunInWorkSerializer(grpc_error* error); + + RefCountedPtr resolver_; + grpc_closure closure_; + XdsApi::LdsUpdate update_; + Type type_; + }; + class ListenerWatcher : public XdsClient::ListenerWatcherInterface { public: explicit ListenerWatcher(RefCountedPtr resolver) : resolver_(std::move(resolver)) {} - void OnListenerChanged(XdsApi::LdsUpdate listener) override; - void OnError(grpc_error* error) override; - void OnResourceDoesNotExist() override; + void OnListenerChanged(XdsApi::LdsUpdate listener) override { + new Notifier(resolver_, std::move(listener)); + } + void OnError(grpc_error* error) override { new Notifier(resolver_, error); } + void OnResourceDoesNotExist() override { new Notifier(resolver_); } private: RefCountedPtr resolver_; @@ -85,9 +108,11 @@ class XdsResolver : public Resolver { public: explicit RouteConfigWatcher(RefCountedPtr resolver) : resolver_(std::move(resolver)) {} - void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override; - void OnError(grpc_error* error) override; - void OnResourceDoesNotExist() override; + void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override { + new Notifier(resolver_, std::move(route_config)); + } + void OnError(grpc_error* error) override { new Notifier(resolver_, error); } + void OnResourceDoesNotExist() override { new Notifier(resolver_); } private: RefCountedPtr resolver_; @@ -146,6 +171,7 @@ class XdsResolver : public Resolver { std::map> clusters_; }; + void OnListenerUpdate(XdsApi::LdsUpdate lds_update); void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update); void OnError(grpc_error* error); void OnResourceDoesNotExist(); @@ -166,73 +192,67 @@ class XdsResolver : public Resolver { }; // -// XdsResolver::ListenerWatcher +// XdsResolver::Notifier // -void XdsResolver::ListenerWatcher::OnListenerChanged( - XdsApi::LdsUpdate listener) { - if (resolver_->xds_client_ == nullptr) return; - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { - gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", - resolver_.get()); - } - if (listener.route_config_name != resolver_->route_config_name_) { - if (resolver_->route_config_watcher_ != nullptr) { - resolver_->xds_client_->CancelRouteConfigDataWatch( - resolver_->route_config_name_, resolver_->route_config_watcher_, - /*delay_unsubscription=*/!listener.route_config_name.empty()); - resolver_->route_config_watcher_ = nullptr; - } - resolver_->route_config_name_ = std::move(listener.route_config_name); - if (!resolver_->route_config_name_.empty()) { - auto watcher = absl::make_unique(resolver_->Ref()); - resolver_->route_config_watcher_ = watcher.get(); - resolver_->xds_client_->WatchRouteConfigData( - resolver_->route_config_name_, std::move(watcher)); - } - } - if (resolver_->route_config_name_.empty()) { - GPR_ASSERT(listener.rds_update.has_value()); - resolver_->OnRouteConfigUpdate(std::move(*listener.rds_update)); - } +XdsResolver::Notifier::Notifier(RefCountedPtr resolver, + XdsApi::LdsUpdate update) + : resolver_(std::move(resolver)), + update_(std::move(update)), + type_(kLdsUpdate) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } -void XdsResolver::ListenerWatcher::OnError(grpc_error* error) { - if (resolver_->xds_client_ == nullptr) return; - gpr_log(GPR_ERROR, "[xds_resolver %p] received listener error: %s", - resolver_.get(), grpc_error_string(error)); - resolver_->OnError(error); +XdsResolver::Notifier::Notifier(RefCountedPtr resolver, + XdsApi::RdsUpdate update) + : resolver_(std::move(resolver)), type_(kRdsUpdate) { + update_.rds_update = std::move(update); + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } -void XdsResolver::ListenerWatcher::OnResourceDoesNotExist() { - if (resolver_->xds_client_ == nullptr) return; - resolver_->OnResourceDoesNotExist(); +XdsResolver::Notifier::Notifier(RefCountedPtr resolver, + grpc_error* error) + : resolver_(std::move(resolver)), type_(kError) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, error); } -// -// XdsResolver::RouteConfigWatcher -// - -void XdsResolver::RouteConfigWatcher::OnRouteConfigChanged( - XdsApi::RdsUpdate route_config) { - if (resolver_->xds_client_ == nullptr) return; - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { - gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config data", - resolver_.get()); - } - resolver_->OnRouteConfigUpdate(std::move(route_config)); +XdsResolver::Notifier::Notifier(RefCountedPtr resolver) + : resolver_(std::move(resolver)), type_(kDoesNotExist) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } -void XdsResolver::RouteConfigWatcher::OnError(grpc_error* error) { - if (resolver_->xds_client_ == nullptr) return; - gpr_log(GPR_ERROR, "[xds_resolver %p] received route config error: %s", - resolver_.get(), grpc_error_string(error)); - resolver_->OnError(error); +void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error* error) { + Notifier* self = static_cast(arg); + GRPC_ERROR_REF(error); + self->resolver_->work_serializer()->Run( + [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION); } -void XdsResolver::RouteConfigWatcher::OnResourceDoesNotExist() { - if (resolver_->xds_client_ == nullptr) return; - resolver_->OnResourceDoesNotExist(); +void XdsResolver::Notifier::RunInWorkSerializer(grpc_error* error) { + if (resolver_->xds_client_ == nullptr) { + GRPC_ERROR_UNREF(error); + delete this; + return; + } + switch (type_) { + case kLdsUpdate: + resolver_->OnListenerUpdate(std::move(update_)); + break; + case kRdsUpdate: + resolver_->OnRouteConfigUpdate(std::move(*update_.rds_update)); + break; + case kError: + resolver_->OnError(error); + break; + case kDoesNotExist: + resolver_->OnResourceDoesNotExist(); + break; + }; + delete this; } // @@ -493,7 +513,7 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig( void XdsResolver::StartLocked() { grpc_error* error = GRPC_ERROR_NONE; - xds_client_ = MakeOrphanable(work_serializer(), *args_, &error); + xds_client_ = MakeOrphanable(*args_, &error); if (error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Failed to create xds client -- channel will remain in " @@ -528,7 +548,34 @@ void XdsResolver::ShutdownLocked() { } } +void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this); + } + if (listener.route_config_name != route_config_name_) { + if (route_config_watcher_ != nullptr) { + xds_client_->CancelRouteConfigDataWatch( + route_config_name_, route_config_watcher_, + /*delay_unsubscription=*/!listener.route_config_name.empty()); + route_config_watcher_ = nullptr; + } + route_config_name_ = std::move(listener.route_config_name); + if (!route_config_name_.empty()) { + auto watcher = absl::make_unique(Ref()); + route_config_watcher_ = watcher.get(); + xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher)); + } + } + if (route_config_name_.empty()) { + GPR_ASSERT(listener.rds_update.has_value()); + OnRouteConfigUpdate(std::move(*listener.rds_update)); + } +} + void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this); + } // Find the relevant VirtualHost from the RouteConfiguration. XdsApi::RdsUpdate::VirtualHost* vhost = rds_update.FindVirtualHostForDomain(server_name_); @@ -546,6 +593,8 @@ void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) { } void XdsResolver::OnError(grpc_error* error) { + gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s", + this, grpc_error_string(error)); grpc_arg xds_client_arg = xds_client_->MakeChannelArg(); Result result; result.args = grpc_channel_args_copy_and_add(args_, &xds_client_arg, 1); diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 7a027a2cf57..e39f18a728e 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -50,7 +50,6 @@ #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/credentials/fake/fake_credentials.h" #include "src/core/lib/slice/slice_internal.h" @@ -169,9 +168,11 @@ class XdsClient::ChannelState::AdsCallState private: static void OnTimer(void* arg, grpc_error* error) { ResourceState* self = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - self->ads_calld_->xds_client()->work_serializer_->Run( - [self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION); + { + MutexLock lock(&self->ads_calld_->xds_client()->mu_); + self->OnTimerLocked(GRPC_ERROR_REF(error)); + } + self->Unref(DEBUG_LOCATION, "timer"); } void OnTimerLocked(grpc_error* error) { @@ -213,7 +214,6 @@ class XdsClient::ChannelState::AdsCallState GRPC_ERROR_UNREF(watcher_error); } ads_calld_.reset(); - Unref(DEBUG_LOCATION, "timer"); GRPC_ERROR_UNREF(error); } @@ -250,7 +250,7 @@ class XdsClient::ChannelState::AdsCallState static void OnRequestSent(void* arg, grpc_error* error); void OnRequestSentLocked(grpc_error* error); static void OnResponseReceived(void* arg, grpc_error* error); - void OnResponseReceivedLocked(); + bool OnResponseReceivedLocked(); static void OnStatusReceived(void* arg, grpc_error* error); void OnStatusReceivedLocked(grpc_error* error); @@ -327,10 +327,10 @@ class XdsClient::ChannelState::LrsCallState private: void ScheduleNextReportLocked(); static void OnNextReportTimer(void* arg, grpc_error* error); - void OnNextReportTimerLocked(grpc_error* error); + bool OnNextReportTimerLocked(grpc_error* error); void SendReportLocked(); static void OnReportDone(void* arg, grpc_error* error); - void OnReportDoneLocked(grpc_error* error); + bool OnReportDoneLocked(grpc_error* error); bool IsCurrentReporterOnCall() const { return this == parent_->reporter_.get(); @@ -352,7 +352,7 @@ class XdsClient::ChannelState::LrsCallState static void OnInitialRequestSent(void* arg, grpc_error* error); void OnInitialRequestSentLocked(); static void OnResponseReceived(void* arg, grpc_error* error); - void OnResponseReceivedLocked(); + bool OnResponseReceivedLocked(); static void OnStatusReceived(void* arg, grpc_error* error); void OnStatusReceivedLocked(grpc_error* error); @@ -397,13 +397,12 @@ class XdsClient::ChannelState::StateWatcher : public AsyncConnectivityStateWatcherInterface { public: explicit StateWatcher(RefCountedPtr parent) - : AsyncConnectivityStateWatcherInterface( - parent->xds_client()->work_serializer_), - parent_(std::move(parent)) {} + : parent_(std::move(parent)) {} private: void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& status) override { + MutexLock lock(&parent_->xds_client_->mu_); if (!parent_->shutting_down_ && new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // In TRANSIENT_FAILURE. Notify all watchers of error. @@ -411,8 +410,9 @@ class XdsClient::ChannelState::StateWatcher "[xds_client %p] xds channel in state:TRANSIENT_FAILURE " "status_message:(%s)", parent_->xds_client(), status.ToString().c_str()); - parent_->xds_client()->NotifyOnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "xds channel in TRANSIENT_FAILURE")); + parent_->xds_client()->NotifyOnErrorLocked( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "xds channel in TRANSIENT_FAILURE")); } } @@ -655,9 +655,11 @@ template void XdsClient::ChannelState::RetryableCall::OnRetryTimer( void* arg, grpc_error* error) { RetryableCall* calld = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - calld->chand_->xds_client()->work_serializer_->Run( - [calld, error]() { calld->OnRetryTimerLocked(error); }, DEBUG_LOCATION); + { + MutexLock lock(&calld->chand_->xds_client()->mu_); + calld->OnRetryTimerLocked(GRPC_ERROR_REF(error)); + } + calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done"); } template @@ -673,7 +675,6 @@ void XdsClient::ChannelState::RetryableCall::OnRetryTimerLocked( } StartNewCallLocked(); } - this->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done"); GRPC_ERROR_UNREF(error); } @@ -1125,10 +1126,11 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate( void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg, grpc_error* error) { AdsCallState* ads_calld = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - ads_calld->xds_client()->work_serializer_->Run( - [ads_calld, error]() { ads_calld->OnRequestSentLocked(error); }, - DEBUG_LOCATION); + { + MutexLock lock(&ads_calld->xds_client()->mu_); + ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error)); + } + ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked"); } void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked( @@ -1152,22 +1154,24 @@ void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked( buffered_requests_.erase(it); } } - Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked"); GRPC_ERROR_UNREF(error); } void XdsClient::ChannelState::AdsCallState::OnResponseReceived( void* arg, grpc_error* /* error */) { AdsCallState* ads_calld = static_cast(arg); - ads_calld->xds_client()->work_serializer_->Run( - [ads_calld]() { ads_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION); + bool done; + { + MutexLock lock(&ads_calld->xds_client()->mu_); + done = ads_calld->OnResponseReceivedLocked(); + } + if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); } -void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { +bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { // Empty payload means the call was cancelled. if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) { - Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); - return; + return true; } // Read the response. grpc_byte_buffer_reader bbr; @@ -1227,10 +1231,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { } } } - if (xds_client()->shutting_down_) { - Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked+xds_shutdown"); - return; - } + if (xds_client()->shutting_down_) return true; // Keep listening for updates. grpc_op op; memset(&op, 0, sizeof(op)); @@ -1243,15 +1244,17 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { const grpc_call_error call_error = grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); + return false; } void XdsClient::ChannelState::AdsCallState::OnStatusReceived( void* arg, grpc_error* error) { AdsCallState* ads_calld = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - ads_calld->xds_client()->work_serializer_->Run( - [ads_calld, error]() { ads_calld->OnStatusReceivedLocked(error); }, - DEBUG_LOCATION); + { + MutexLock lock(&ads_calld->xds_client()->mu_); + ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error)); + } + ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked"); } void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked( @@ -1270,10 +1273,9 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked( // Try to restart the call. parent_->OnCallFinishedLocked(); // Send error to all watchers. - xds_client()->NotifyOnError( + xds_client()->NotifyOnErrorLocked( GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed")); } - Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked"); GRPC_ERROR_UNREF(error); } @@ -1320,21 +1322,23 @@ void XdsClient::ChannelState::LrsCallState::Reporter:: void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer( void* arg, grpc_error* error) { Reporter* self = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - self->xds_client()->work_serializer_->Run( - [self, error]() { self->OnNextReportTimerLocked(error); }, - DEBUG_LOCATION); + bool done; + { + MutexLock lock(&self->xds_client()->mu_); + done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error)); + } + if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer"); } -void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked( +bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked( grpc_error* error) { next_report_timer_callback_pending_ = false; if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) { - Unref(DEBUG_LOCATION, "Reporter+timer"); - } else { - SendReportLocked(); + GRPC_ERROR_UNREF(error); + return true; } - GRPC_ERROR_UNREF(error); + SendReportLocked(); + return false; } namespace { @@ -1357,8 +1361,9 @@ bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) { void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { // Construct snapshot from all reported stats. - XdsApi::ClusterLoadReportMap snapshot = xds_client()->BuildLoadReportSnapshot( - parent_->send_all_clusters_, parent_->cluster_names_); + XdsApi::ClusterLoadReportMap snapshot = + xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_, + parent_->cluster_names_); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. const bool old_val = last_report_counters_were_zero_; @@ -1391,32 +1396,35 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone( void* arg, grpc_error* error) { Reporter* self = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - self->xds_client()->work_serializer_->Run( - [self, error]() { self->OnReportDoneLocked(error); }, DEBUG_LOCATION); + bool done; + { + MutexLock lock(&self->xds_client()->mu_); + done = self->OnReportDoneLocked(GRPC_ERROR_REF(error)); + } + if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done"); } -void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked( +bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked( grpc_error* error) { grpc_byte_buffer_destroy(parent_->send_message_payload_); parent_->send_message_payload_ = nullptr; // If there are no more registered stats to report, cancel the call. if (xds_client()->load_report_map_.empty()) { parent_->chand()->StopLrsCall(); - Unref(DEBUG_LOCATION, "Reporter+report_done+no_more_reporters"); - return; + GRPC_ERROR_UNREF(error); + return true; } if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) { + GRPC_ERROR_UNREF(error); // If this reporter is no longer the current one on the call, the reason // might be that it was orphaned for a new one due to config update. if (!IsCurrentReporterOnCall()) { parent_->MaybeStartReportingLocked(); } - Unref(DEBUG_LOCATION, "Reporter+report_done"); - } else { - ScheduleNextReportLocked(); + return true; } - GRPC_ERROR_UNREF(error); + ScheduleNextReportLocked(); + return false; } // @@ -1566,9 +1574,11 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() { void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent( void* arg, grpc_error* /*error*/) { LrsCallState* lrs_calld = static_cast(arg); - lrs_calld->xds_client()->work_serializer_->Run( - [lrs_calld]() { lrs_calld->OnInitialRequestSentLocked(); }, - DEBUG_LOCATION); + { + MutexLock lock(&lrs_calld->xds_client()->mu_); + lrs_calld->OnInitialRequestSentLocked(); + } + lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked"); } void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() { @@ -1576,21 +1586,23 @@ void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() { grpc_byte_buffer_destroy(send_message_payload_); send_message_payload_ = nullptr; MaybeStartReportingLocked(); - Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked"); } void XdsClient::ChannelState::LrsCallState::OnResponseReceived( void* arg, grpc_error* /*error*/) { LrsCallState* lrs_calld = static_cast(arg); - lrs_calld->xds_client()->work_serializer_->Run( - [lrs_calld]() { lrs_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION); + bool done; + { + MutexLock lock(&lrs_calld->xds_client()->mu_); + done = lrs_calld->OnResponseReceivedLocked(); + } + if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked"); } -void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() { +bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() { // Empty payload means the call was cancelled. if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) { - Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked"); - return; + return true; } // Read the response. grpc_byte_buffer_reader bbr; @@ -1663,10 +1675,7 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() { MaybeStartReportingLocked(); }(); grpc_slice_unref_internal(response_slice); - if (xds_client()->shutting_down_) { - Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked+xds_shutdown"); - return; - } + if (xds_client()->shutting_down_) return true; // Keep listening for LRS config updates. grpc_op op; memset(&op, 0, sizeof(op)); @@ -1679,15 +1688,17 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() { const grpc_call_error call_error = grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); + return false; } void XdsClient::ChannelState::LrsCallState::OnStatusReceived( void* arg, grpc_error* error) { LrsCallState* lrs_calld = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - lrs_calld->xds_client()->work_serializer_->Run( - [lrs_calld, error]() { lrs_calld->OnStatusReceivedLocked(error); }, - DEBUG_LOCATION); + { + MutexLock lock(&lrs_calld->xds_client()->mu_); + lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error)); + } + lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked"); } void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked( @@ -1708,7 +1719,6 @@ void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked( // Try to restart the call. parent_->OnCallFinishedLocked(); } - Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked"); GRPC_ERROR_UNREF(error); } @@ -1765,11 +1775,9 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap, } // namespace -XdsClient::XdsClient(std::shared_ptr work_serializer, - const grpc_channel_args& channel_args, grpc_error** error) +XdsClient::XdsClient(const grpc_channel_args& channel_args, grpc_error** error) : InternallyRefCounted(&grpc_xds_client_trace), request_timeout_(GetRequestTimeout(channel_args)), - work_serializer_(std::move(work_serializer)), interested_parties_(grpc_pollset_set_create()), bootstrap_( XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)), @@ -1809,17 +1817,20 @@ void XdsClient::Orphan() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this); } - shutting_down_ = true; - chand_.reset(); - // We do not clear cluster_map_ and endpoint_map_ if the xds client was - // created by the XdsResolver because the maps contain refs for watchers which - // in turn hold refs to the loadbalancing policies. At this point, it is - // possible for ADS calls to be in progress. Unreffing the loadbalancing - // policies before those calls are done would lead to issues such as - // https://github.com/grpc/grpc/issues/20928. - if (!listener_map_.empty()) { - cluster_map_.clear(); - endpoint_map_.clear(); + { + MutexLock lock(&mu_); + shutting_down_ = true; + chand_.reset(); + // We do not clear cluster_map_ and endpoint_map_ if the xds client was + // created by the XdsResolver because the maps contain refs for watchers + // which in turn hold refs to the loadbalancing policies. At this point, it + // is possible for ADS calls to be in progress. Unreffing the loadbalancing + // policies before those calls are done would lead to issues such as + // https://github.com/grpc/grpc/issues/20928. + if (!listener_map_.empty()) { + cluster_map_.clear(); + endpoint_map_.clear(); + } } Unref(DEBUG_LOCATION, "XdsClient::Orphan()"); } @@ -1828,6 +1839,7 @@ void XdsClient::WatchListenerData( absl::string_view listener_name, std::unique_ptr watcher) { std::string listener_name_str = std::string(listener_name); + MutexLock lock(&mu_); ListenerState& listener_state = listener_map_[listener_name_str]; ListenerWatcherInterface* w = watcher.get(); listener_state.watchers[w] = std::move(watcher); @@ -1846,6 +1858,7 @@ void XdsClient::WatchListenerData( void XdsClient::CancelListenerDataWatch(absl::string_view listener_name, ListenerWatcherInterface* watcher, bool delay_unsubscription) { + MutexLock lock(&mu_); if (shutting_down_) return; std::string listener_name_str = std::string(listener_name); ListenerState& listener_state = listener_map_[listener_name_str]; @@ -1864,6 +1877,7 @@ void XdsClient::WatchRouteConfigData( absl::string_view route_config_name, std::unique_ptr watcher) { std::string route_config_name_str = std::string(route_config_name); + MutexLock lock(&mu_); RouteConfigState& route_config_state = route_config_map_[route_config_name_str]; RouteConfigWatcherInterface* w = watcher.get(); @@ -1884,6 +1898,7 @@ void XdsClient::WatchRouteConfigData( void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name, RouteConfigWatcherInterface* watcher, bool delay_unsubscription) { + MutexLock lock(&mu_); if (shutting_down_) return; std::string route_config_name_str = std::string(route_config_name); RouteConfigState& route_config_state = @@ -1903,6 +1918,7 @@ void XdsClient::WatchClusterData( absl::string_view cluster_name, std::unique_ptr watcher) { std::string cluster_name_str = std::string(cluster_name); + MutexLock lock(&mu_); ClusterState& cluster_state = cluster_map_[cluster_name_str]; ClusterWatcherInterface* w = watcher.get(); cluster_state.watchers[w] = std::move(watcher); @@ -1921,6 +1937,7 @@ void XdsClient::WatchClusterData( void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name, ClusterWatcherInterface* watcher, bool delay_unsubscription) { + MutexLock lock(&mu_); if (shutting_down_) return; std::string cluster_name_str = std::string(cluster_name); ClusterState& cluster_state = cluster_map_[cluster_name_str]; @@ -1939,6 +1956,7 @@ void XdsClient::WatchEndpointData( absl::string_view eds_service_name, std::unique_ptr watcher) { std::string eds_service_name_str = std::string(eds_service_name); + MutexLock lock(&mu_); EndpointState& endpoint_state = endpoint_map_[eds_service_name_str]; EndpointWatcherInterface* w = watcher.get(); endpoint_state.watchers[w] = std::move(watcher); @@ -1957,6 +1975,7 @@ void XdsClient::WatchEndpointData( void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name, EndpointWatcherInterface* watcher, bool delay_unsubscription) { + MutexLock lock(&mu_); if (shutting_down_) return; std::string eds_service_name_str = std::string(eds_service_name); EndpointState& endpoint_state = endpoint_map_[eds_service_name_str]; @@ -1978,6 +1997,7 @@ RefCountedPtr XdsClient::AddClusterDropStats( // server name specified in lrs_server. auto key = std::make_pair(std::string(cluster_name), std::string(eds_service_name)); + MutexLock lock(&mu_); // We jump through some hoops here to make sure that the absl::string_views // stored in the XdsClusterDropStats object point to the strings // in the load_report_map_ key, so that they have the same lifetime. @@ -1996,6 +2016,7 @@ void XdsClient::RemoveClusterDropStats( absl::string_view /*lrs_server*/, absl::string_view cluster_name, absl::string_view eds_service_name, XdsClusterDropStats* cluster_drop_stats) { + MutexLock lock(&mu_); auto load_report_it = load_report_map_.find( std::make_pair(std::string(cluster_name), std::string(eds_service_name))); if (load_report_it == load_report_map_.end()) return; @@ -2021,6 +2042,7 @@ RefCountedPtr XdsClient::AddClusterLocalityStats( // server name specified in lrs_server. auto key = std::make_pair(std::string(cluster_name), std::string(eds_service_name)); + MutexLock lock(&mu_); // We jump through some hoops here to make sure that the absl::string_views // stored in the XdsClusterLocalityStats object point to the strings // in the load_report_map_ key, so that they have the same lifetime. @@ -2042,6 +2064,7 @@ void XdsClient::RemoveClusterLocalityStats( absl::string_view eds_service_name, const RefCountedPtr& locality, XdsClusterLocalityStats* cluster_locality_stats) { + MutexLock lock(&mu_); auto load_report_it = load_report_map_.find( std::make_pair(std::string(cluster_name), std::string(eds_service_name))); if (load_report_it == load_report_map_.end()) return; @@ -2062,12 +2085,41 @@ void XdsClient::RemoveClusterLocalityStats( } void XdsClient::ResetBackoff() { + MutexLock lock(&mu_); if (chand_ != nullptr) { grpc_channel_reset_connect_backoff(chand_->channel()); } } -XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot( +void XdsClient::NotifyOnErrorLocked(grpc_error* error) { + for (const auto& p : listener_map_) { + const ListenerState& listener_state = p.second; + for (const auto& p : listener_state.watchers) { + p.first->OnError(GRPC_ERROR_REF(error)); + } + } + for (const auto& p : route_config_map_) { + const RouteConfigState& route_config_state = p.second; + for (const auto& p : route_config_state.watchers) { + p.first->OnError(GRPC_ERROR_REF(error)); + } + } + for (const auto& p : cluster_map_) { + const ClusterState& cluster_state = p.second; + for (const auto& p : cluster_state.watchers) { + p.first->OnError(GRPC_ERROR_REF(error)); + } + } + for (const auto& p : endpoint_map_) { + const EndpointState& endpoint_state = p.second; + for (const auto& p : endpoint_state.watchers) { + p.first->OnError(GRPC_ERROR_REF(error)); + } + } + GRPC_ERROR_UNREF(error); +} + +XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( bool send_all_clusters, const std::set& clusters) { XdsApi::ClusterLoadReportMap snapshot_map; for (auto load_report_it = load_report_map_.begin(); @@ -2135,34 +2187,6 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot( return snapshot_map; } -void XdsClient::NotifyOnError(grpc_error* error) { - for (const auto& p : listener_map_) { - const ListenerState& listener_state = p.second; - for (const auto& p : listener_state.watchers) { - p.first->OnError(GRPC_ERROR_REF(error)); - } - } - for (const auto& p : route_config_map_) { - const RouteConfigState& route_config_state = p.second; - for (const auto& p : route_config_state.watchers) { - p.first->OnError(GRPC_ERROR_REF(error)); - } - } - for (const auto& p : cluster_map_) { - const ClusterState& cluster_state = p.second; - for (const auto& p : cluster_state.watchers) { - p.first->OnError(GRPC_ERROR_REF(error)); - } - } - for (const auto& p : endpoint_map_) { - const EndpointState& endpoint_state = p.second; - for (const auto& p : endpoint_state.watchers) { - p.first->OnError(GRPC_ERROR_REF(error)); - } - } - GRPC_ERROR_UNREF(error); -} - void* XdsClient::ChannelArgCopy(void* p) { XdsClient* xds_client = static_cast(p); xds_client->Ref(DEBUG_LOCATION, "channel arg").release(); diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 16e67dc13cd..3fd7c68340c 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -33,7 +33,7 @@ #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/iomgr/work_serializer.h" +#include "src/core/lib/gprpp/sync.h" namespace grpc_core { @@ -91,8 +91,7 @@ class XdsClient : public InternallyRefCounted { // If *error is not GRPC_ERROR_NONE after construction, then there was // an error initializing the client. - XdsClient(std::shared_ptr work_serializer, - const grpc_channel_args& channel_args, grpc_error** error); + XdsClient(const grpc_channel_args& channel_args, grpc_error** error); ~XdsClient(); grpc_pollset_set* interested_parties() const { return interested_parties_; } @@ -286,9 +285,9 @@ class XdsClient : public InternallyRefCounted { }; // Sends an error notification to all watchers. - void NotifyOnError(grpc_error* error); + void NotifyOnErrorLocked(grpc_error* error); - XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot( + XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( bool send_all_clusters, const std::set& clusters); // Channel arg vtable functions. @@ -300,7 +299,7 @@ class XdsClient : public InternallyRefCounted { const grpc_millis request_timeout_; - std::shared_ptr work_serializer_; + Mutex mu_; grpc_pollset_set* interested_parties_; std::unique_ptr bootstrap_; diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index f8a945eb035..6b1d68cf8ef 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -73,8 +73,8 @@ class AsyncConnectivityStateWatcherInterface protected: class Notifier; - // If \a combiner is nullptr, then the notification will be scheduled on the - // ExecCtx. + // If \a work_serializer is nullptr, then the notification will be scheduled + // on the ExecCtx. explicit AsyncConnectivityStateWatcherInterface( std::shared_ptr work_serializer = nullptr) : work_serializer_(std::move(work_serializer)) {}