modifications for xds.cc

reviewable/pr20542/r1
Yash Tibrewal 5 years ago
parent a17bbbd840
commit 4b1223f875
  1. 523
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 74
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc

@ -793,12 +793,6 @@ GrpcLb::BalancerCallState::BalancerCallState(
// Init other data associated with the LB call.
grpc_metadata_array_init(&lb_initial_metadata_recv_);
grpc_metadata_array_init(&lb_trailing_metadata_recv_);
GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSent, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived,
this, grpc_schedule_on_exec_ctx);
}
GrpcLb::BalancerCallState::~BalancerCallState() {
@ -856,6 +850,8 @@ void GrpcLb::BalancerCallState::StartQuery() {
// with the callback.
auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
self.release();
GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSent, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(
lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
@ -878,6 +874,8 @@ void GrpcLb::BalancerCallState::StartQuery() {
// with the callback.
self = Ref(DEBUG_LOCATION, "on_message_received");
self.release();
GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(
lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
@ -894,6 +892,8 @@ void GrpcLb::BalancerCallState::StartQuery() {
// This callback signals the end of the LB call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived,
this, grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(
lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
@ -1184,6 +1184,9 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
op.flags = 0;
op.reserved = nullptr;
// Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
GrpcLb::BalancerCallState::OnBalancerMessageReceived,
this, grpc_schedule_on_exec_ctx);
const grpc_call_error call_error = grpc_call_start_batch_and_execute(
lb_calld->lb_call_, &op, 1,
&lb_calld->lb_on_balancer_message_received_);
@ -1363,12 +1366,6 @@ GrpcLb::GrpcLb(Args args)
.set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1000)) {
// Initialization.
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChanged, this,
grpc_schedule_on_exec_ctx);
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg);
@ -1461,6 +1458,8 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
// Start timer.
grpc_millis deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE before the timer fires, we go into
@ -1470,6 +1469,9 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
// Ref held by callback.
Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChanged, this,
grpc_schedule_on_exec_ctx);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(interested_parties()),
@ -1552,6 +1554,9 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(self->lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChanged, this,
grpc_schedule_on_exec_ctx);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
@ -1633,282 +1638,282 @@ void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) {
GRPC_CLOSURE_INIT(&lb_on_call_retry_,
&GrpcLb::OnBalancerCallRetryTimerLocked, this, nullptr),
GRPC_ERROR_REF(error));
}
void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->retry_timer_callback_pending_ = false;
if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
grpclb_policy->lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
grpclb_policy);
}
grpclb_policy->StartBalancerCallLocked();
void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->retry_timer_callback_pending_ = false;
if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
grpclb_policy->lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
grpclb_policy);
}
grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
grpclb_policy->StartBalancerCallLocked();
}
grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
}
//
// code for handling fallback mode
//
//
// code for handling fallback mode
//
void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
// Enter fallback mode if all of the following are true:
// - We are not currently in fallback mode.
// - We are not currently waiting for the initial fallback timeout.
// - We are not currently in contact with the balancer.
// - The child policy is not in state READY.
if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
(lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
!child_policy_ready_) {
gpr_log(GPR_INFO,
"[grpclb %p] lost contact with balancer and backends from "
"most recent serverlist; entering fallback mode",
this);
fallback_mode_ = true;
CreateOrUpdateChildPolicyLocked();
}
void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
// Enter fallback mode if all of the following are true:
// - We are not currently in fallback mode.
// - We are not currently waiting for the initial fallback timeout.
// - We are not currently in contact with the balancer.
// - The child policy is not in state READY.
if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
(lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
!child_policy_ready_) {
gpr_log(GPR_INFO,
"[grpclb %p] lost contact with balancer and backends from "
"most recent serverlist; entering fallback mode",
this);
fallback_mode_ = true;
CreateOrUpdateChildPolicyLocked();
}
}
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked,
this, nullptr),
GRPC_ERROR_REF(error));
}
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
nullptr),
GRPC_ERROR_REF(error));
}
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (grpclb_policy->fallback_at_startup_checks_pending_ &&
!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
grpclb_policy);
grpclb_policy->fallback_at_startup_checks_pending_ = false;
grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
grpclb_policy->fallback_mode_ = true;
grpclb_policy->CreateOrUpdateChildPolicyLocked();
}
grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (grpclb_policy->fallback_at_startup_checks_pending_ &&
!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
grpclb_policy);
grpclb_policy->fallback_at_startup_checks_pending_ = false;
grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
grpclb_policy->fallback_mode_ = true;
grpclb_policy->CreateOrUpdateChildPolicyLocked();
}
grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}
//
// code for interacting with the child policy
//
//
// code for interacting with the child policy
//
grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
bool is_backend_from_grpclb_load_balancer) {
InlinedVector<grpc_arg, 2> args_to_add;
grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
bool is_backend_from_grpclb_load_balancer) {
InlinedVector<grpc_arg, 2> args_to_add;
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
is_backend_from_grpclb_load_balancer));
if (is_backend_from_grpclb_load_balancer) {
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(
GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
is_backend_from_grpclb_load_balancer));
if (is_backend_from_grpclb_load_balancer) {
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1));
}
return grpc_channel_args_copy_and_add(args_, args_to_add.data(),
args_to_add.size());
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1));
}
return grpc_channel_args_copy_and_add(args_, args_to_add.data(),
args_to_add.size());
}
OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args) {
Helper* helper = New<Helper>(Ref());
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(helper);
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
name);
return nullptr;
}
helper->set_child(lb_policy.get());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this,
name, lb_policy.get());
}
// Add the gRPC LB's interested_parties pollset_set to that of the newly
// created child policy. This will make the child policy progress upon
// activity on gRPC LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args) {
Helper* helper = New<Helper>(Ref());
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(helper);
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
name);
return nullptr;
}
helper->set_child(lb_policy.get());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this,
name, lb_policy.get());
}
// Add the gRPC LB's interested_parties pollset_set to that of the newly
// created child policy. This will make the child policy progress upon
// activity on gRPC LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
void GrpcLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return;
// Construct update args.
UpdateArgs update_args;
bool is_backend_from_grpclb_load_balancer = false;
if (fallback_mode_) {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new round_robin policy will keep the requested
// picks pending.
update_args.addresses = fallback_backend_addresses_;
} else {
update_args.addresses = serverlist_->GetServerAddressList(
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
is_backend_from_grpclb_load_balancer = true;
}
update_args.args =
CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
GPR_ASSERT(update_args.args != nullptr);
update_args.config = child_policy_config_;
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char* child_policy_name = child_policy_config_ == nullptr
? "round_robin"
: child_policy_config_->name();
const bool create_policy =
// case 1
child_policy_ == nullptr ||
// case 2b
(pending_child_policy_ == nullptr &&
strcmp(child_policy_->name(), child_policy_name) != 0) ||
// case 3b
(pending_child_policy_ != nullptr &&
strcmp(pending_child_policy_->name(), child_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
}
// Swap the policy into place.
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_child_policy_ != nullptr
? pending_child_policy_.get()
: child_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
void GrpcLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return;
// Construct update args.
UpdateArgs update_args;
bool is_backend_from_grpclb_load_balancer = false;
if (fallback_mode_) {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new round_robin policy will keep the requested
// picks pending.
update_args.addresses = fallback_backend_addresses_;
} else {
update_args.addresses = serverlist_->GetServerAddressList(
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
is_backend_from_grpclb_load_balancer = true;
}
update_args.args =
CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
GPR_ASSERT(update_args.args != nullptr);
update_args.config = child_policy_config_;
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char* child_policy_name = child_policy_config_ == nullptr
? "round_robin"
: child_policy_config_->name();
const bool create_policy =
// case 1
child_policy_ == nullptr ||
// case 2b
(pending_child_policy_ == nullptr &&
strcmp(child_policy_->name(), child_policy_name) != 0) ||
// case 3b
(pending_child_policy_ != nullptr &&
strcmp(pending_child_policy_->name(), child_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this,
policy_to_update == pending_child_policy_.get() ? "pending " : "",
policy_to_update);
gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
}
policy_to_update->UpdateLocked(std::move(update_args));
// Swap the policy into place.
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_child_policy_ != nullptr
? pending_child_policy_.get()
: child_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this,
policy_to_update == pending_child_policy_.get() ? "pending " : "",
policy_to_update);
}
policy_to_update->UpdateLocked(std::move(update_args));
}
//
// factory
//
//
// factory
//
class GrpcLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<GrpcLb>(std::move(args));
}
class GrpcLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<GrpcLb>(std::move(args));
}
const char* name() const override { return kGrpclb; }
const char* name() const override { return kGrpclb; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json == nullptr) {
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedGrpcLbConfig>(nullptr));
}
InlinedVector<grpc_error*, 2> error_list;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
for (const grpc_json* field = json->child; field != nullptr;
field = field->next) {
if (field->key == nullptr) continue;
if (strcmp(field->key, "childPolicy") == 0) {
if (child_policy != nullptr) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:childPolicy error:Duplicate entry"));
}
grpc_error* parse_error = GRPC_ERROR_NONE;
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
field, &parse_error);
if (parse_error != GRPC_ERROR_NONE) {
error_list.push_back(parse_error);
}
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json == nullptr) {
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedGrpcLbConfig>(nullptr));
}
InlinedVector<grpc_error*, 2> error_list;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
for (const grpc_json* field = json->child; field != nullptr;
field = field->next) {
if (field->key == nullptr) continue;
if (strcmp(field->key, "childPolicy") == 0) {
if (child_policy != nullptr) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:childPolicy error:Duplicate entry"));
}
grpc_error* parse_error = GRPC_ERROR_NONE;
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
field, &parse_error);
if (parse_error != GRPC_ERROR_NONE) {
error_list.push_back(parse_error);
}
}
if (error_list.empty()) {
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedGrpcLbConfig>(std::move(child_policy)));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
return nullptr;
}
}
};
if (error_list.empty()) {
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedGrpcLbConfig>(std::move(child_policy)));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
return nullptr;
}
}
};
} // namespace
} // namespace
} // namespace grpc_core
//
// Plugin registration

@ -250,6 +250,7 @@ class XdsLb : public LoadBalancingPolicy {
grpc_channel_args* CreateChildPolicyArgsLocked(
const grpc_channel_args* args);
static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
XdsLb* xds_policy() const { return locality_map_->xds_policy(); }
@ -299,6 +300,8 @@ class XdsLb : public LoadBalancingPolicy {
private:
void OnLocalityStateUpdateLocked();
void UpdateConnectivityStateLocked();
static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
static void OnFailoverTimer(void* arg, grpc_error* error);
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
static void OnFailoverTimerLocked(void* arg, grpc_error* error);
@ -378,6 +381,7 @@ class XdsLb : public LoadBalancingPolicy {
// Methods for dealing with fallback state.
void MaybeCancelFallbackAtStartupChecks();
static void OnFallbackTimer(void* arg, grpc_error* error);
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
void UpdateFallbackPolicyLocked();
OrphanablePtr<LoadBalancingPolicy> CreateFallbackPolicyLocked(
@ -798,8 +802,8 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
if (is_initial_update) {
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure
GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimer, this,
grpc_schedule_on_exec_ctx);
fallback_at_startup_checks_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
}
@ -819,6 +823,14 @@ void XdsLb::MaybeCancelFallbackAtStartupChecks() {
fallback_at_startup_checks_pending_ = false;
}
void XdsLb::OnFallbackTimer(void* arg, grpc_error* error) {
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
xdslb_policy->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this,
nullptr),
GRPC_ERROR_REF(error));
}
void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
// If some fallback-at-startup check is done after the timer fires but before
@ -1103,10 +1115,9 @@ XdsLb::PriorityList::LocalityMap::LocalityMap(RefCountedPtr<XdsLb> xds_policy,
gpr_log(GPR_INFO, "[xdslb %p] Creating priority %" PRIu32,
xds_policy_.get(), priority_);
}
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked,
this, grpc_combiner_scheduler(xds_policy_->combiner()));
GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimerLocked, this,
grpc_combiner_scheduler(xds_policy_->combiner()));
GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this,
grpc_schedule_on_exec_ctx);
// Start the failover timer.
Ref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked").release();
grpc_timer_init(
@ -1222,6 +1233,8 @@ void XdsLb::PriorityList::LocalityMap::DeactivateLocked() {
xds_policy(), priority_,
xds_policy()->locality_retention_interval_ms_);
}
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(
&delayed_removal_timer_,
ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_,
@ -1350,6 +1363,15 @@ void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() {
}
}
void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer(
void* arg, grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg);
self->xds_policy_->combiner->Run(
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked,
this, nullptr),
GRPC_ERROR_REF(error));
}
void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked(
void* arg, grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg);
@ -1359,13 +1381,13 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked(
const bool keep = self->priority_list_update().Contains(self->priority_) &&
self->priority_ <= priority_list->current_priority();
if (!keep) {
// This check is to make sure we always delete the locality maps from the
// lowest priority even if the closures of the back-to-back timers are not
// run in FIFO order.
// This check is to make sure we always delete the locality maps from
// the lowest priority even if the closures of the back-to-back timers
// are not run in FIFO order.
// TODO(juanlishen): Eliminate unnecessary maintenance overhead for some
// deactivated locality maps when out-of-order closures are run.
// TODO(juanlishen): Check the timer implementation to see if this defense
// is necessary.
// TODO(juanlishen): Check the timer implementation to see if this
// defense is necessary.
if (self->priority_ == priority_list->LowestPriority()) {
priority_list->priorities_.pop_back();
} else {
@ -1380,6 +1402,15 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked(
self->Unref(DEBUG_LOCATION, "LocalityMap+timer");
}
void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg,
grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg);
self->xds_policy_->combiner()->Run(
GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimerLocked, this,
nullptr),
GRPC_ERROR_REF(error));
}
void XdsLb::PriorityList::LocalityMap::OnFailoverTimerLocked(
void* arg, grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg);
@ -1402,8 +1433,6 @@ XdsLb::PriorityList::LocalityMap::Locality::Locality(
gpr_log(GPR_INFO, "[xdslb %p] created Locality %p for %s", xds_policy(),
this, name_->AsHumanReadableString());
}
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked,
this, grpc_combiner_scheduler(xds_policy()->combiner()));
}
XdsLb::PriorityList::LocalityMap::Locality::~Locality() {
@ -1458,8 +1487,8 @@ XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyLocked(
lb_policy.get());
}
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on xDS
// LB, which in turn is tied to the application's call.
// child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
xds_policy()->interested_parties());
return lb_policy;
@ -1529,8 +1558,8 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked(
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
// TODO(juanlishen): If the child policy is not configured via service config,
// use whatever algorithm is specified by the balancer.
// TODO(juanlishen): If the child policy is not configured via service
// config, use whatever algorithm is specified by the balancer.
const char* child_policy_name =
xds_policy()->child_policy_config_ == nullptr
? "round_robin"
@ -1623,6 +1652,8 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() {
weight_ = 0;
// Start a timer to delete the locality.
Ref(DEBUG_LOCATION, "Locality+timer").release();
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked,
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(
&delayed_removal_timer_,
ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_,
@ -1630,6 +1661,15 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() {
delayed_removal_timer_callback_pending_ = true;
}
void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer(
void* arg, grpc_error* error) {
Locality* self = static_cast<Locality*>(arg);
self->xds_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked,
this, nullptr),
GRPC_ERROR_REF(error));
}
void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimerLocked(
void* arg, grpc_error* error) {
Locality* self = static_cast<Locality*>(arg);

Loading…
Cancel
Save