|
|
|
@ -49,27 +49,22 @@ TraceFlag grpc_lb_eds_trace(false, "eds_lb"); |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
constexpr char kXds[] = "xds_experimental"; |
|
|
|
|
constexpr char kEds[] = "eds_experimental"; |
|
|
|
|
|
|
|
|
|
// Config for EDS LB policy.
|
|
|
|
|
class EdsLbConfig : public LoadBalancingPolicy::Config { |
|
|
|
|
public: |
|
|
|
|
EdsLbConfig(const char* name, std::string cluster_name, |
|
|
|
|
std::string eds_service_name, |
|
|
|
|
EdsLbConfig(std::string cluster_name, std::string eds_service_name, |
|
|
|
|
absl::optional<std::string> lrs_load_reporting_server_name, |
|
|
|
|
Json locality_picking_policy, Json endpoint_picking_policy, |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy) |
|
|
|
|
: name_(name), |
|
|
|
|
cluster_name_(std::move(cluster_name)), |
|
|
|
|
Json locality_picking_policy, Json endpoint_picking_policy) |
|
|
|
|
: cluster_name_(std::move(cluster_name)), |
|
|
|
|
eds_service_name_(std::move(eds_service_name)), |
|
|
|
|
lrs_load_reporting_server_name_( |
|
|
|
|
std::move(lrs_load_reporting_server_name)), |
|
|
|
|
locality_picking_policy_(std::move(locality_picking_policy)), |
|
|
|
|
endpoint_picking_policy_(std::move(endpoint_picking_policy)), |
|
|
|
|
fallback_policy_(std::move(fallback_policy)) {} |
|
|
|
|
endpoint_picking_policy_(std::move(endpoint_picking_policy)) {} |
|
|
|
|
|
|
|
|
|
const char* name() const override { return name_; } |
|
|
|
|
const char* name() const override { return kEds; } |
|
|
|
|
|
|
|
|
|
const std::string& cluster_name() const { return cluster_name_; } |
|
|
|
|
const std::string& eds_service_name() const { return eds_service_name_; } |
|
|
|
@ -82,26 +77,21 @@ class EdsLbConfig : public LoadBalancingPolicy::Config { |
|
|
|
|
const Json& endpoint_picking_policy() const { |
|
|
|
|
return endpoint_picking_policy_; |
|
|
|
|
} |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy() const { |
|
|
|
|
return fallback_policy_; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
const char* name_; |
|
|
|
|
std::string cluster_name_; |
|
|
|
|
std::string eds_service_name_; |
|
|
|
|
absl::optional<std::string> lrs_load_reporting_server_name_; |
|
|
|
|
Json locality_picking_policy_; |
|
|
|
|
Json endpoint_picking_policy_; |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// EDS LB policy.
|
|
|
|
|
class EdsLb : public LoadBalancingPolicy { |
|
|
|
|
public: |
|
|
|
|
EdsLb(const char* name, Args args); |
|
|
|
|
explicit EdsLb(Args args); |
|
|
|
|
|
|
|
|
|
const char* name() const override { return name_; } |
|
|
|
|
const char* name() const override { return kEds; } |
|
|
|
|
|
|
|
|
|
void UpdateLocked(UpdateArgs args) override; |
|
|
|
|
void ResetBackoffLocked() override; |
|
|
|
@ -153,24 +143,6 @@ class EdsLb : public LoadBalancingPolicy { |
|
|
|
|
RefCountedPtr<EdsLb> eds_policy_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class FallbackHelper : public ChannelControlHelper { |
|
|
|
|
public: |
|
|
|
|
explicit FallbackHelper(RefCountedPtr<EdsLb> parent) |
|
|
|
|
: parent_(std::move(parent)) {} |
|
|
|
|
|
|
|
|
|
~FallbackHelper() { parent_.reset(DEBUG_LOCATION, "FallbackHelper"); } |
|
|
|
|
|
|
|
|
|
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
|
|
|
|
const grpc_channel_args& args) override; |
|
|
|
|
void UpdateState(grpc_connectivity_state state, |
|
|
|
|
std::unique_ptr<SubchannelPicker> picker) override; |
|
|
|
|
void RequestReresolution() override; |
|
|
|
|
void AddTraceEvent(TraceSeverity severity, StringView message) override; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
RefCountedPtr<EdsLb> parent_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
~EdsLb(); |
|
|
|
|
|
|
|
|
|
void ShutdownLocked() override; |
|
|
|
@ -185,15 +157,6 @@ class EdsLb : public LoadBalancingPolicy { |
|
|
|
|
const grpc_channel_args* args_in); |
|
|
|
|
void MaybeUpdateDropPickerLocked(); |
|
|
|
|
|
|
|
|
|
// Methods for dealing with fallback state.
|
|
|
|
|
void MaybeCancelFallbackAtStartupChecks(); |
|
|
|
|
static void OnFallbackTimer(void* arg, grpc_error* error); |
|
|
|
|
static void OnFallbackTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
void UpdateFallbackPolicyLocked(); |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> CreateFallbackPolicyLocked( |
|
|
|
|
const grpc_channel_args* args); |
|
|
|
|
void MaybeExitFallbackMode(); |
|
|
|
|
|
|
|
|
|
// Caller must ensure that config_ is set before calling.
|
|
|
|
|
const StringView GetEdsResourceName() const { |
|
|
|
|
if (xds_client_from_channel_ == nullptr) return server_name_; |
|
|
|
@ -216,9 +179,6 @@ class EdsLb : public LoadBalancingPolicy { |
|
|
|
|
: xds_client_.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Policy name (kXds or kEds).
|
|
|
|
|
const char* name_; |
|
|
|
|
|
|
|
|
|
// Server name from target URI.
|
|
|
|
|
std::string server_name_; |
|
|
|
|
|
|
|
|
@ -251,26 +211,6 @@ class EdsLb : public LoadBalancingPolicy { |
|
|
|
|
// The latest state and picker returned from the child policy.
|
|
|
|
|
grpc_connectivity_state child_state_; |
|
|
|
|
RefCountedPtr<ChildPickerWrapper> child_picker_; |
|
|
|
|
|
|
|
|
|
// Non-null iff we are in fallback mode.
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> fallback_policy_; |
|
|
|
|
|
|
|
|
|
// Whether the checks for fallback at startup are ALL pending. There are
|
|
|
|
|
// several cases where this can be reset:
|
|
|
|
|
// 1. The fallback timer fires, we enter fallback mode.
|
|
|
|
|
// 2. Before the fallback timer fires, the endpoint watcher reports an
|
|
|
|
|
// error, we enter fallback mode.
|
|
|
|
|
// 3. Before the fallback timer fires, if any child policy in the locality map
|
|
|
|
|
// becomes READY, we cancel the fallback timer.
|
|
|
|
|
bool fallback_at_startup_checks_pending_ = false; |
|
|
|
|
// Timeout in milliseconds for before using fallback backend addresses.
|
|
|
|
|
// 0 means not using fallback.
|
|
|
|
|
const grpc_millis lb_fallback_timeout_ms_; |
|
|
|
|
// The backend addresses from the resolver.
|
|
|
|
|
ServerAddressList fallback_backend_addresses_; |
|
|
|
|
// Fallback timer.
|
|
|
|
|
grpc_timer lb_fallback_timer_; |
|
|
|
|
grpc_closure lb_on_fallback_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -331,15 +271,6 @@ void EdsLb::Helper::UpdateState(grpc_connectivity_state state, |
|
|
|
|
eds_policy_->child_state_ = state; |
|
|
|
|
eds_policy_->child_picker_ = |
|
|
|
|
MakeRefCounted<ChildPickerWrapper>(std::move(picker)); |
|
|
|
|
// If the new state is READY, cancel the fallback-at-startup checks.
|
|
|
|
|
if (state == GRPC_CHANNEL_READY) { |
|
|
|
|
eds_policy_->MaybeCancelFallbackAtStartupChecks(); |
|
|
|
|
eds_policy_->MaybeExitFallbackMode(); |
|
|
|
|
} |
|
|
|
|
// TODO(roth): If the child reports TRANSIENT_FAILURE and the
|
|
|
|
|
// fallback-at-startup checks are pending, we should probably go into
|
|
|
|
|
// fallback mode immediately (cancelling the fallback-at-startup timer
|
|
|
|
|
// if needed).
|
|
|
|
|
// Wrap the picker in a DropPicker and pass it up.
|
|
|
|
|
eds_policy_->MaybeUpdateDropPickerLocked(); |
|
|
|
|
} |
|
|
|
@ -349,33 +280,6 @@ void EdsLb::Helper::AddTraceEvent(TraceSeverity severity, StringView message) { |
|
|
|
|
eds_policy_->channel_control_helper()->AddTraceEvent(severity, message); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// EdsLb::FallbackHelper
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
RefCountedPtr<SubchannelInterface> EdsLb::FallbackHelper::CreateSubchannel( |
|
|
|
|
const grpc_channel_args& args) { |
|
|
|
|
if (parent_->shutting_down_) return nullptr; |
|
|
|
|
return parent_->channel_control_helper()->CreateSubchannel(args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void EdsLb::FallbackHelper::UpdateState( |
|
|
|
|
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { |
|
|
|
|
if (parent_->shutting_down_) return; |
|
|
|
|
parent_->channel_control_helper()->UpdateState(state, std::move(picker)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void EdsLb::FallbackHelper::RequestReresolution() { |
|
|
|
|
if (parent_->shutting_down_) return; |
|
|
|
|
parent_->channel_control_helper()->RequestReresolution(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void EdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity, |
|
|
|
|
StringView message) { |
|
|
|
|
if (parent_->shutting_down_) return; |
|
|
|
|
parent_->channel_control_helper()->AddTraceEvent(severity, message); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// EdsLb::EndpointWatcher
|
|
|
|
|
//
|
|
|
|
@ -392,9 +296,6 @@ class EdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface { |
|
|
|
|
gpr_log(GPR_INFO, "[edslb %p] Received EDS update from xds client", |
|
|
|
|
eds_policy_.get()); |
|
|
|
|
} |
|
|
|
|
// If the balancer tells us to drop all the calls, we should exit fallback
|
|
|
|
|
// mode immediately.
|
|
|
|
|
if (update.drop_config->drop_all()) eds_policy_->MaybeExitFallbackMode(); |
|
|
|
|
// Update the drop config.
|
|
|
|
|
const bool drop_config_changed = |
|
|
|
|
eds_policy_->drop_config_ == nullptr || |
|
|
|
@ -424,34 +325,18 @@ class EdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void OnError(grpc_error* error) override { |
|
|
|
|
// If the fallback-at-startup checks are pending, go into fallback mode
|
|
|
|
|
// immediately. This short-circuits the timeout for the
|
|
|
|
|
// fallback-at-startup case.
|
|
|
|
|
if (eds_policy_->fallback_at_startup_checks_pending_) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"[edslb %p] xds watcher reported error; entering fallback " |
|
|
|
|
"mode: %s", |
|
|
|
|
eds_policy_.get(), grpc_error_string(error)); |
|
|
|
|
eds_policy_->fallback_at_startup_checks_pending_ = false; |
|
|
|
|
grpc_timer_cancel(&eds_policy_->lb_fallback_timer_); |
|
|
|
|
eds_policy_->UpdateFallbackPolicyLocked(); |
|
|
|
|
// If the xds call failed, request re-resolution.
|
|
|
|
|
// TODO(roth): We check the error string contents here to
|
|
|
|
|
// differentiate between the xds call failing and the xds channel
|
|
|
|
|
// going into TRANSIENT_FAILURE. This is a pretty ugly hack,
|
|
|
|
|
// but it's okay for now, since we're not yet sure whether we will
|
|
|
|
|
// continue to support the current fallback functionality. If we
|
|
|
|
|
// decide to keep the fallback approach, then we should either
|
|
|
|
|
// find a cleaner way to expose the difference between these two
|
|
|
|
|
// cases or decide that we're okay re-resolving in both cases.
|
|
|
|
|
// Note that even if we do keep the current fallback functionality,
|
|
|
|
|
// this re-resolution will only be necessary if we are going to be
|
|
|
|
|
// using this LB policy with resolvers other than the xds resolver.
|
|
|
|
|
if (strstr(grpc_error_string(error), "xds call failed")) { |
|
|
|
|
eds_policy_->channel_control_helper()->RequestReresolution(); |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_ERROR, "[edslb %p] xds watcher reported error: %s", |
|
|
|
|
eds_policy_.get(), grpc_error_string(error)); |
|
|
|
|
// Go into TRANSIENT_FAILURE if we have not yet created the child
|
|
|
|
|
// policy (i.e., we have not yet received data from xds). Otherwise,
|
|
|
|
|
// we keep running with the data we had previously.
|
|
|
|
|
if (eds_policy_->child_policy_ == nullptr) { |
|
|
|
|
eds_policy_->channel_control_helper()->UpdateState( |
|
|
|
|
GRPC_CHANNEL_TRANSIENT_FAILURE, |
|
|
|
|
absl::make_unique<TransientFailurePicker>(error)); |
|
|
|
|
} else { |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -462,13 +347,9 @@ class EdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface { |
|
|
|
|
// EdsLb public methods
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
EdsLb::EdsLb(const char* name, Args args) |
|
|
|
|
EdsLb::EdsLb(Args args) |
|
|
|
|
: LoadBalancingPolicy(std::move(args)), |
|
|
|
|
name_(name), |
|
|
|
|
xds_client_from_channel_(XdsClient::GetFromChannelArgs(*args.args)), |
|
|
|
|
lb_fallback_timeout_ms_(grpc_channel_args_find_integer( |
|
|
|
|
args.args, GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS, |
|
|
|
|
{GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT, 0, INT_MAX})) { |
|
|
|
|
xds_client_from_channel_(XdsClient::GetFromChannelArgs(*args.args)) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[edslb %p] created -- xds client from channel: %p", this, |
|
|
|
|
xds_client_from_channel_.get()); |
|
|
|
@ -499,7 +380,6 @@ void EdsLb::ShutdownLocked() { |
|
|
|
|
gpr_log(GPR_INFO, "[edslb %p] shutting down", this); |
|
|
|
|
} |
|
|
|
|
shutting_down_ = true; |
|
|
|
|
MaybeCancelFallbackAtStartupChecks(); |
|
|
|
|
// Drop our ref to the child's picker, in case it's holding a ref to
|
|
|
|
|
// the child.
|
|
|
|
|
child_picker_.reset(); |
|
|
|
@ -508,11 +388,6 @@ void EdsLb::ShutdownLocked() { |
|
|
|
|
interested_parties()); |
|
|
|
|
child_policy_.reset(); |
|
|
|
|
} |
|
|
|
|
if (fallback_policy_ != nullptr) { |
|
|
|
|
grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(), |
|
|
|
|
interested_parties()); |
|
|
|
|
fallback_policy_.reset(); |
|
|
|
|
} |
|
|
|
|
drop_stats_.reset(); |
|
|
|
|
// Cancel the endpoint watch here instead of in our dtor if we are using the
|
|
|
|
|
// xds resolver, because the watcher holds a ref to us and we might not be
|
|
|
|
@ -540,15 +415,10 @@ void EdsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
// Update config.
|
|
|
|
|
auto old_config = std::move(config_); |
|
|
|
|
config_ = std::move(args.config); |
|
|
|
|
// Update fallback address list.
|
|
|
|
|
fallback_backend_addresses_ = std::move(args.addresses); |
|
|
|
|
// Update args.
|
|
|
|
|
grpc_channel_args_destroy(args_); |
|
|
|
|
args_ = args.args; |
|
|
|
|
args.args = nullptr; |
|
|
|
|
// Update the existing fallback policy. The fallback policy config and/or the
|
|
|
|
|
// fallback addresses may be new.
|
|
|
|
|
if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked(); |
|
|
|
|
if (is_initial_update) { |
|
|
|
|
// Initialize XdsClient.
|
|
|
|
|
if (xds_client_from_channel_ == nullptr) { |
|
|
|
@ -556,7 +426,7 @@ void EdsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
xds_client_ = MakeOrphanable<XdsClient>( |
|
|
|
|
combiner(), interested_parties(), GetEdsResourceName(), |
|
|
|
|
nullptr /* service config watcher */, *args_, &error); |
|
|
|
|
// TODO(roth): If we decide that we care about fallback mode, add
|
|
|
|
|
// TODO(roth): If we decide that we care about EDS-only mode, add
|
|
|
|
|
// proper error handling here.
|
|
|
|
|
GPR_ASSERT(error == GRPC_ERROR_NONE); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { |
|
|
|
@ -564,13 +434,6 @@ void EdsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
xds_client_.get()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Start fallback-at-startup checks.
|
|
|
|
|
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; |
|
|
|
|
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure
|
|
|
|
|
GRPC_CLOSURE_INIT(&lb_on_fallback_, &EdsLb::OnFallbackTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
fallback_at_startup_checks_pending_ = true; |
|
|
|
|
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); |
|
|
|
|
} |
|
|
|
|
// Update drop stats for load reporting if needed.
|
|
|
|
|
if (is_initial_update || config_->lrs_load_reporting_server_name() != |
|
|
|
@ -609,9 +472,6 @@ void EdsLb::ResetBackoffLocked() { |
|
|
|
|
if (child_policy_ != nullptr) { |
|
|
|
|
child_policy_->ResetBackoffLocked(); |
|
|
|
|
} |
|
|
|
|
if (fallback_policy_ != nullptr) { |
|
|
|
|
fallback_policy_->ResetBackoffLocked(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -875,8 +735,6 @@ OrphanablePtr<LoadBalancingPolicy> EdsLb::CreateChildPolicyLocked( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void EdsLb::MaybeUpdateDropPickerLocked() { |
|
|
|
|
// If we are in fallback mode, don't override the picker.
|
|
|
|
|
if (fallback_policy_ != nullptr) return; |
|
|
|
|
// If we're dropping all calls, report READY, regardless of what (or
|
|
|
|
|
// whether) the child has reported.
|
|
|
|
|
if (drop_config_ != nullptr && drop_config_->drop_all()) { |
|
|
|
@ -891,111 +749,24 @@ void EdsLb::MaybeUpdateDropPickerLocked() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// fallback-related methods
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
void EdsLb::MaybeCancelFallbackAtStartupChecks() { |
|
|
|
|
if (!fallback_at_startup_checks_pending_) return; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[edslb %p] Cancelling fallback timer", this); |
|
|
|
|
} |
|
|
|
|
grpc_timer_cancel(&lb_fallback_timer_); |
|
|
|
|
fallback_at_startup_checks_pending_ = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void EdsLb::OnFallbackTimer(void* arg, grpc_error* error) { |
|
|
|
|
EdsLb* edslb_policy = static_cast<EdsLb*>(arg); |
|
|
|
|
edslb_policy->combiner()->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&edslb_policy->lb_on_fallback_, |
|
|
|
|
&EdsLb::OnFallbackTimerLocked, edslb_policy, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void EdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { |
|
|
|
|
EdsLb* edslb_policy = static_cast<EdsLb*>(arg); |
|
|
|
|
// If some fallback-at-startup check is done after the timer fires but before
|
|
|
|
|
// this callback actually runs, don't fall back.
|
|
|
|
|
if (edslb_policy->fallback_at_startup_checks_pending_ && |
|
|
|
|
!edslb_policy->shutting_down_ && error == GRPC_ERROR_NONE) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[edslb %p] Child policy not ready after fallback timeout; " |
|
|
|
|
"entering fallback mode", |
|
|
|
|
edslb_policy); |
|
|
|
|
edslb_policy->fallback_at_startup_checks_pending_ = false; |
|
|
|
|
edslb_policy->UpdateFallbackPolicyLocked(); |
|
|
|
|
} |
|
|
|
|
edslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void EdsLb::UpdateFallbackPolicyLocked() { |
|
|
|
|
if (shutting_down_) return; |
|
|
|
|
// Create policy if needed.
|
|
|
|
|
if (fallback_policy_ == nullptr) { |
|
|
|
|
fallback_policy_ = CreateFallbackPolicyLocked(args_); |
|
|
|
|
} |
|
|
|
|
// Construct update args.
|
|
|
|
|
UpdateArgs update_args; |
|
|
|
|
update_args.addresses = fallback_backend_addresses_; |
|
|
|
|
update_args.config = config_->fallback_policy(); |
|
|
|
|
update_args.args = grpc_channel_args_copy(args_); |
|
|
|
|
// Update the policy.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[edslb %p] Updating fallback child policy handler %p", |
|
|
|
|
this, fallback_policy_.get()); |
|
|
|
|
} |
|
|
|
|
fallback_policy_->UpdateLocked(std::move(update_args)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> EdsLb::CreateFallbackPolicyLocked( |
|
|
|
|
const grpc_channel_args* args) { |
|
|
|
|
LoadBalancingPolicy::Args lb_policy_args; |
|
|
|
|
lb_policy_args.combiner = combiner(); |
|
|
|
|
lb_policy_args.args = args; |
|
|
|
|
lb_policy_args.channel_control_helper = |
|
|
|
|
absl::make_unique<FallbackHelper>(Ref(DEBUG_LOCATION, "FallbackHelper")); |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> lb_policy = |
|
|
|
|
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
|
|
|
|
&grpc_lb_eds_trace); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[edslb %p] Created new fallback child policy handler %p", |
|
|
|
|
this, lb_policy.get()); |
|
|
|
|
} |
|
|
|
|
// Add our interested_parties pollset_set to that of the newly created
|
|
|
|
|
// child policy. This will make the child policy progress upon activity on
|
|
|
|
|
// this policy, which in turn is tied to the application's call.
|
|
|
|
|
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
|
|
|
|
interested_parties()); |
|
|
|
|
return lb_policy; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void EdsLb::MaybeExitFallbackMode() { |
|
|
|
|
if (fallback_policy_ == nullptr) return; |
|
|
|
|
gpr_log(GPR_INFO, "[edslb %p] Exiting fallback mode", this); |
|
|
|
|
fallback_policy_.reset(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// factory
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
class EdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
public: |
|
|
|
|
explicit EdsLbFactory(const char* name) : name_(name) {} |
|
|
|
|
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
|
|
|
|
LoadBalancingPolicy::Args args) const override { |
|
|
|
|
return MakeOrphanable<EdsChildHandler>(std::move(args), &grpc_lb_eds_trace, |
|
|
|
|
name_); |
|
|
|
|
return MakeOrphanable<EdsChildHandler>(std::move(args), &grpc_lb_eds_trace); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const char* name() const override { return name_; } |
|
|
|
|
const char* name() const override { return kEds; } |
|
|
|
|
|
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
|
|
|
|
const Json& json, grpc_error** error) const override { |
|
|
|
|
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); |
|
|
|
|
if (json.type() == Json::Type::JSON_NULL) { |
|
|
|
|
// xds was mentioned as a policy in the deprecated loadBalancingPolicy
|
|
|
|
|
// eds was mentioned as a policy in the deprecated loadBalancingPolicy
|
|
|
|
|
// field or in the client API.
|
|
|
|
|
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:loadBalancingPolicy error:eds policy requires configuration. " |
|
|
|
@ -1016,21 +787,15 @@ class EdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
} |
|
|
|
|
// Cluster name.
|
|
|
|
|
std::string cluster_name; |
|
|
|
|
if (name_ == kEds) { |
|
|
|
|
it = json.object_value().find("clusterName"); |
|
|
|
|
if (it == json.object_value().end()) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:clusterName error:required field missing")); |
|
|
|
|
} else if (it->second.type() != Json::Type::STRING) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:clusterName error:type should be string")); |
|
|
|
|
} else { |
|
|
|
|
cluster_name = it->second.string_value(); |
|
|
|
|
} |
|
|
|
|
it = json.object_value().find("clusterName"); |
|
|
|
|
if (it == json.object_value().end()) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:clusterName error:required field missing")); |
|
|
|
|
} else if (it->second.type() != Json::Type::STRING) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:clusterName error:type should be string")); |
|
|
|
|
} else { |
|
|
|
|
// For xds policy, this field does not exist in the config, so it
|
|
|
|
|
// will always be set to the same value as edsServiceName.
|
|
|
|
|
cluster_name = eds_service_name; |
|
|
|
|
cluster_name = it->second.string_value(); |
|
|
|
|
} |
|
|
|
|
// LRS load reporting server name.
|
|
|
|
|
absl::optional<std::string> lrs_load_reporting_server_name; |
|
|
|
@ -1043,20 +808,20 @@ class EdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
lrs_load_reporting_server_name.emplace(it->second.string_value()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Locality-picking policy. Not supported for xds policy.
|
|
|
|
|
Json locality_picking_policy = Json::Array{ |
|
|
|
|
Json::Object{ |
|
|
|
|
{"weighted_target_experimental", |
|
|
|
|
Json::Object{ |
|
|
|
|
{"targets", Json::Object()}, |
|
|
|
|
}}, |
|
|
|
|
}, |
|
|
|
|
}; |
|
|
|
|
if (name_ == kEds) { |
|
|
|
|
it = json.object_value().find("localityPickingPolicy"); |
|
|
|
|
if (it != json.object_value().end()) { |
|
|
|
|
locality_picking_policy = it->second; |
|
|
|
|
} |
|
|
|
|
// Locality-picking policy.
|
|
|
|
|
Json locality_picking_policy; |
|
|
|
|
it = json.object_value().find("localityPickingPolicy"); |
|
|
|
|
if (it == json.object_value().end()) { |
|
|
|
|
locality_picking_policy = Json::Array{ |
|
|
|
|
Json::Object{ |
|
|
|
|
{"weighted_target_experimental", |
|
|
|
|
Json::Object{ |
|
|
|
|
{"targets", Json::Object()}, |
|
|
|
|
}}, |
|
|
|
|
}, |
|
|
|
|
}; |
|
|
|
|
} else { |
|
|
|
|
locality_picking_policy = it->second; |
|
|
|
|
} |
|
|
|
|
grpc_error* parse_error = GRPC_ERROR_NONE; |
|
|
|
|
if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( |
|
|
|
@ -1067,10 +832,8 @@ class EdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
GRPC_ERROR_UNREF(parse_error); |
|
|
|
|
} |
|
|
|
|
// Endpoint-picking policy. Called "childPolicy" for xds policy.
|
|
|
|
|
const char* field_name = |
|
|
|
|
name_ == kEds ? "endpointPickingPolicy" : "childPolicy"; |
|
|
|
|
Json endpoint_picking_policy; |
|
|
|
|
it = json.object_value().find(field_name); |
|
|
|
|
it = json.object_value().find("endpointPickingPolicy"); |
|
|
|
|
if (it == json.object_value().end()) { |
|
|
|
|
endpoint_picking_policy = Json::Array{ |
|
|
|
|
Json::Object{ |
|
|
|
@ -1085,36 +848,16 @@ class EdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
endpoint_picking_policy, &parse_error) == nullptr) { |
|
|
|
|
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
|
|
|
|
field_name, &parse_error, 1)); |
|
|
|
|
GRPC_ERROR_UNREF(parse_error); |
|
|
|
|
} |
|
|
|
|
// Fallback policy.
|
|
|
|
|
Json fallback_policy_config; |
|
|
|
|
it = json.object_value().find("fallbackPolicy"); |
|
|
|
|
if (it == json.object_value().end()) { |
|
|
|
|
fallback_policy_config = Json::Array{Json::Object{ |
|
|
|
|
{"round_robin", Json::Object()}, |
|
|
|
|
}}; |
|
|
|
|
} else { |
|
|
|
|
fallback_policy_config = it->second; |
|
|
|
|
} |
|
|
|
|
parse_error = GRPC_ERROR_NONE; |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy = |
|
|
|
|
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( |
|
|
|
|
fallback_policy_config, &parse_error); |
|
|
|
|
if (fallback_policy == nullptr) { |
|
|
|
|
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
|
|
|
|
"fallbackPolicy", &parse_error, 1)); |
|
|
|
|
"endpointPickingPolicy", &parse_error, 1)); |
|
|
|
|
GRPC_ERROR_UNREF(parse_error); |
|
|
|
|
error_list.push_back(parse_error); |
|
|
|
|
} |
|
|
|
|
// Construct config.
|
|
|
|
|
if (error_list.empty()) { |
|
|
|
|
return MakeRefCounted<EdsLbConfig>( |
|
|
|
|
name_, std::move(cluster_name), std::move(eds_service_name), |
|
|
|
|
std::move(cluster_name), std::move(eds_service_name), |
|
|
|
|
std::move(lrs_load_reporting_server_name), |
|
|
|
|
std::move(locality_picking_policy), |
|
|
|
|
std::move(endpoint_picking_policy), std::move(fallback_policy)); |
|
|
|
|
std::move(endpoint_picking_policy)); |
|
|
|
|
} else { |
|
|
|
|
*error = GRPC_ERROR_CREATE_FROM_VECTOR( |
|
|
|
|
"eds_experimental LB policy config", &error_list); |
|
|
|
@ -1125,14 +868,14 @@ class EdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
private: |
|
|
|
|
class EdsChildHandler : public ChildPolicyHandler { |
|
|
|
|
public: |
|
|
|
|
EdsChildHandler(Args args, TraceFlag* tracer, const char* name) |
|
|
|
|
: ChildPolicyHandler(std::move(args), tracer), name_(name) {} |
|
|
|
|
EdsChildHandler(Args args, TraceFlag* tracer) |
|
|
|
|
: ChildPolicyHandler(std::move(args), tracer) {} |
|
|
|
|
|
|
|
|
|
bool ConfigChangeRequiresNewPolicyInstance( |
|
|
|
|
LoadBalancingPolicy::Config* old_config, |
|
|
|
|
LoadBalancingPolicy::Config* new_config) const override { |
|
|
|
|
GPR_ASSERT(old_config->name() == name_); |
|
|
|
|
GPR_ASSERT(new_config->name() == name_); |
|
|
|
|
GPR_ASSERT(old_config->name() == kEds); |
|
|
|
|
GPR_ASSERT(new_config->name() == kEds); |
|
|
|
|
EdsLbConfig* old_eds_config = static_cast<EdsLbConfig*>(old_config); |
|
|
|
|
EdsLbConfig* new_eds_config = static_cast<EdsLbConfig*>(new_config); |
|
|
|
|
return old_eds_config->cluster_name() != new_eds_config->cluster_name() || |
|
|
|
@ -1142,14 +885,9 @@ class EdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
|
|
|
|
const char* name, LoadBalancingPolicy::Args args) const override { |
|
|
|
|
return MakeOrphanable<EdsLb>(name_, std::move(args)); |
|
|
|
|
return MakeOrphanable<EdsLb>(std::move(args)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
const char* name_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
const char* name_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
@ -1163,13 +901,7 @@ class EdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
void grpc_lb_policy_eds_init() { |
|
|
|
|
grpc_core::LoadBalancingPolicyRegistry::Builder:: |
|
|
|
|
RegisterLoadBalancingPolicyFactory( |
|
|
|
|
absl::make_unique<grpc_core::EdsLbFactory>(grpc_core::kEds)); |
|
|
|
|
// TODO(roth): This is here just for backward compatibility with some
|
|
|
|
|
// old tests we have internally. Remove this once they are upgraded
|
|
|
|
|
// to use the new policy name and config.
|
|
|
|
|
grpc_core::LoadBalancingPolicyRegistry::Builder:: |
|
|
|
|
RegisterLoadBalancingPolicyFactory( |
|
|
|
|
absl::make_unique<grpc_core::EdsLbFactory>(grpc_core::kXds)); |
|
|
|
|
absl::make_unique<grpc_core::EdsLbFactory>()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_lb_policy_eds_shutdown() {} |
|
|
|
|