From 4b1223f8753c103760cf9a15660a6786859a607f Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 7 Oct 2019 17:17:12 -0700 Subject: [PATCH] modifications for xds.cc --- .../client_channel/lb_policy/grpclb/grpclb.cc | 523 +++++++++--------- .../client_channel/lb_policy/xds/xds.cc | 74 ++- 2 files changed, 321 insertions(+), 276 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index b9db8322123..ce549cc3618 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -793,12 +793,6 @@ GrpcLb::BalancerCallState::BalancerCallState( // Init other data associated with the LB call. grpc_metadata_array_init(&lb_initial_metadata_recv_); grpc_metadata_array_init(&lb_trailing_metadata_recv_); - GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSent, this, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_, - OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived, - this, grpc_schedule_on_exec_ctx); } GrpcLb::BalancerCallState::~BalancerCallState() { @@ -856,6 +850,8 @@ void GrpcLb::BalancerCallState::StartQuery() { // with the callback. auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent"); self.release(); + GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSent, this, + grpc_schedule_on_exec_ctx); call_error = grpc_call_start_batch_and_execute( lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_); GPR_ASSERT(GRPC_CALL_OK == call_error); @@ -878,6 +874,8 @@ void GrpcLb::BalancerCallState::StartQuery() { // with the callback. self = Ref(DEBUG_LOCATION, "on_message_received"); self.release(); + GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_, + OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx); call_error = grpc_call_start_batch_and_execute( lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); @@ -894,6 +892,8 @@ void GrpcLb::BalancerCallState::StartQuery() { // This callback signals the end of the LB call, so it relies on the initial // ref instead of a new ref. When it's invoked, it's the initial ref that is // unreffed. + GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived, + this, grpc_schedule_on_exec_ctx); call_error = grpc_call_start_batch_and_execute( lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); @@ -1184,6 +1184,9 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( op.flags = 0; op.reserved = nullptr; // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery(). + GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_, + GrpcLb::BalancerCallState::OnBalancerMessageReceived, + this, grpc_schedule_on_exec_ctx); const grpc_call_error call_error = grpc_call_start_batch_and_execute( lb_calld->lb_call_, &op, 1, &lb_calld->lb_on_balancer_message_received_); @@ -1363,12 +1366,6 @@ GrpcLb::GrpcLb(Args args) .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { - // Initialization. - GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, - &GrpcLb::OnBalancerChannelConnectivityChanged, this, - grpc_schedule_on_exec_ctx); // Record server name. const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(arg); @@ -1461,6 +1458,8 @@ void GrpcLb::UpdateLocked(UpdateArgs args) { // Start timer. grpc_millis deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_; Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback + GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this, + grpc_schedule_on_exec_ctx); grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); // Start watching the channel's connectivity state. If the channel // goes into state TRANSIENT_FAILURE before the timer fires, we go into @@ -1470,6 +1469,9 @@ void GrpcLb::UpdateLocked(UpdateArgs args) { GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); // Ref held by callback. Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release(); + GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, + &GrpcLb::OnBalancerChannelConnectivityChanged, this, + grpc_schedule_on_exec_ctx); grpc_client_channel_watch_connectivity_state( client_channel_elem, grpc_polling_entity_create_from_pollset_set(interested_parties()), @@ -1552,6 +1554,9 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, grpc_channel_stack_last_element( grpc_channel_get_channel_stack(self->lb_channel_)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_, + &GrpcLb::OnBalancerChannelConnectivityChanged, this, + grpc_schedule_on_exec_ctx); grpc_client_channel_watch_connectivity_state( client_channel_elem, grpc_polling_entity_create_from_pollset_set( @@ -1633,282 +1638,282 @@ void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) { GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked, this, nullptr), GRPC_ERROR_REF(error)); +} - void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { - GrpcLb* grpclb_policy = static_cast(arg); - grpclb_policy->retry_timer_callback_pending_ = false; - if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE && - grpclb_policy->lb_calld_ == nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", - grpclb_policy); - } - grpclb_policy->StartBalancerCallLocked(); +void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { + GrpcLb* grpclb_policy = static_cast(arg); + grpclb_policy->retry_timer_callback_pending_ = false; + if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE && + grpclb_policy->lb_calld_ == nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", + grpclb_policy); } - grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); + grpclb_policy->StartBalancerCallLocked(); } + grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); +} - // - // code for handling fallback mode - // +// +// code for handling fallback mode +// - void GrpcLb::MaybeEnterFallbackModeAfterStartup() { - // Enter fallback mode if all of the following are true: - // - We are not currently in fallback mode. - // - We are not currently waiting for the initial fallback timeout. - // - We are not currently in contact with the balancer. - // - The child policy is not in state READY. - if (!fallback_mode_ && !fallback_at_startup_checks_pending_ && - (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) && - !child_policy_ready_) { - gpr_log(GPR_INFO, - "[grpclb %p] lost contact with balancer and backends from " - "most recent serverlist; entering fallback mode", - this); - fallback_mode_ = true; - CreateOrUpdateChildPolicyLocked(); - } +void GrpcLb::MaybeEnterFallbackModeAfterStartup() { + // Enter fallback mode if all of the following are true: + // - We are not currently in fallback mode. + // - We are not currently waiting for the initial fallback timeout. + // - We are not currently in contact with the balancer. + // - The child policy is not in state READY. + if (!fallback_mode_ && !fallback_at_startup_checks_pending_ && + (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) && + !child_policy_ready_) { + gpr_log(GPR_INFO, + "[grpclb %p] lost contact with balancer and backends from " + "most recent serverlist; entering fallback mode", + this); + fallback_mode_ = true; + CreateOrUpdateChildPolicyLocked(); } +} - void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { - GrpcLb* grpclb_policy = static_cast(arg); - grpclb_policy->combiner()->Run( - GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, - this, nullptr), - GRPC_ERROR_REF(error)); - } +void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { + GrpcLb* grpclb_policy = static_cast(arg); + grpclb_policy->combiner()->Run( + GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this, + nullptr), + GRPC_ERROR_REF(error)); +} - void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { - GrpcLb* grpclb_policy = static_cast(arg); - // If we receive a serverlist after the timer fires but before this callback - // actually runs, don't fall back. - if (grpclb_policy->fallback_at_startup_checks_pending_ && - !grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) { - gpr_log(GPR_INFO, - "[grpclb %p] No response from balancer after fallback timeout; " - "entering fallback mode", - grpclb_policy); - grpclb_policy->fallback_at_startup_checks_pending_ = false; - grpclb_policy->CancelBalancerChannelConnectivityWatchLocked(); - grpclb_policy->fallback_mode_ = true; - grpclb_policy->CreateOrUpdateChildPolicyLocked(); - } - grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); +void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { + GrpcLb* grpclb_policy = static_cast(arg); + // If we receive a serverlist after the timer fires but before this callback + // actually runs, don't fall back. + if (grpclb_policy->fallback_at_startup_checks_pending_ && + !grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, + "[grpclb %p] No response from balancer after fallback timeout; " + "entering fallback mode", + grpclb_policy); + grpclb_policy->fallback_at_startup_checks_pending_ = false; + grpclb_policy->CancelBalancerChannelConnectivityWatchLocked(); + grpclb_policy->fallback_mode_ = true; + grpclb_policy->CreateOrUpdateChildPolicyLocked(); } + grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); +} - // - // code for interacting with the child policy - // +// +// code for interacting with the child policy +// - grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked( - bool is_backend_from_grpclb_load_balancer) { - InlinedVector args_to_add; +grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked( + bool is_backend_from_grpclb_load_balancer) { + InlinedVector args_to_add; + args_to_add.emplace_back(grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER), + is_backend_from_grpclb_load_balancer)); + if (is_backend_from_grpclb_load_balancer) { args_to_add.emplace_back(grpc_channel_arg_integer_create( - const_cast( - GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER), - is_backend_from_grpclb_load_balancer)); - if (is_backend_from_grpclb_load_balancer) { - args_to_add.emplace_back(grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1)); - } - return grpc_channel_args_copy_and_add(args_, args_to_add.data(), - args_to_add.size()); + const_cast(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1)); } + return grpc_channel_args_copy_and_add(args_, args_to_add.data(), + args_to_add.size()); +} - OrphanablePtr GrpcLb::CreateChildPolicyLocked( - const char* name, const grpc_channel_args* args) { - Helper* helper = New(Ref()); - LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = combiner(); - lb_policy_args.args = args; - lb_policy_args.channel_control_helper = - UniquePtr(helper); - OrphanablePtr lb_policy = - LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - name, std::move(lb_policy_args)); - if (GPR_UNLIKELY(lb_policy == nullptr)) { - gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this, - name); - return nullptr; - } - helper->set_child(lb_policy.get()); - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this, - name, lb_policy.get()); - } - // Add the gRPC LB's interested_parties pollset_set to that of the newly - // created child policy. This will make the child policy progress upon - // activity on gRPC LB, which in turn is tied to the application's call. - grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), - interested_parties()); - return lb_policy; +OrphanablePtr GrpcLb::CreateChildPolicyLocked( + const char* name, const grpc_channel_args* args) { + Helper* helper = New(Ref()); + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.combiner = combiner(); + lb_policy_args.args = args; + lb_policy_args.channel_control_helper = + UniquePtr(helper); + OrphanablePtr lb_policy = + LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + name, std::move(lb_policy_args)); + if (GPR_UNLIKELY(lb_policy == nullptr)) { + gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this, + name); + return nullptr; + } + helper->set_child(lb_policy.get()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this, + name, lb_policy.get()); } + // Add the gRPC LB's interested_parties pollset_set to that of the newly + // created child policy. This will make the child policy progress upon + // activity on gRPC LB, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), + interested_parties()); + return lb_policy; +} - void GrpcLb::CreateOrUpdateChildPolicyLocked() { - if (shutting_down_) return; - // Construct update args. - UpdateArgs update_args; - bool is_backend_from_grpclb_load_balancer = false; - if (fallback_mode_) { - // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't - // received any serverlist from the balancer, we use the fallback backends - // returned by the resolver. Note that the fallback backend list may be - // empty, in which case the new round_robin policy will keep the requested - // picks pending. - update_args.addresses = fallback_backend_addresses_; - } else { - update_args.addresses = serverlist_->GetServerAddressList( - lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats()); - is_backend_from_grpclb_load_balancer = true; - } - update_args.args = - CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer); - GPR_ASSERT(update_args.args != nullptr); - update_args.config = child_policy_config_; - // If the child policy name changes, we need to create a new child - // policy. When this happens, we leave child_policy_ as-is and store - // the new child policy in pending_child_policy_. Once the new child - // policy transitions into state READY, we swap it into child_policy_, - // replacing the original child policy. So pending_child_policy_ is - // non-null only between when we apply an update that changes the child - // policy name and when the new child reports state READY. - // - // Updates can arrive at any point during this transition. We always - // apply updates relative to the most recently created child policy, - // even if the most recent one is still in pending_child_policy_. This - // is true both when applying the updates to an existing child policy - // and when determining whether we need to create a new policy. - // - // As a result of this, there are several cases to consider here: - // - // 1. We have no existing child policy (i.e., we have started up but - // have not yet received a serverlist from the balancer or gone - // into fallback mode; in this case, both child_policy_ and - // pending_child_policy_ are null). In this case, we create a - // new child policy and store it in child_policy_. - // - // 2. We have an existing child policy and have no pending child policy - // from a previous update (i.e., either there has not been a - // previous update that changed the policy name, or we have already - // finished swapping in the new policy; in this case, child_policy_ - // is non-null but pending_child_policy_ is null). In this case: - // a. If child_policy_->name() equals child_policy_name, then we - // update the existing child policy. - // b. If child_policy_->name() does not equal child_policy_name, - // we create a new policy. The policy will be stored in - // pending_child_policy_ and will later be swapped into - // child_policy_ by the helper when the new child transitions - // into state READY. - // - // 3. We have an existing child policy and have a pending child policy - // from a previous update (i.e., a previous update set - // pending_child_policy_ as per case 2b above and that policy has - // not yet transitioned into state READY and been swapped into - // child_policy_; in this case, both child_policy_ and - // pending_child_policy_ are non-null). In this case: - // a. If pending_child_policy_->name() equals child_policy_name, - // then we update the existing pending child policy. - // b. If pending_child_policy->name() does not equal - // child_policy_name, then we create a new policy. The new - // policy is stored in pending_child_policy_ (replacing the one - // that was there before, which will be immediately shut down) - // and will later be swapped into child_policy_ by the helper - // when the new child transitions into state READY. - const char* child_policy_name = child_policy_config_ == nullptr - ? "round_robin" - : child_policy_config_->name(); - const bool create_policy = - // case 1 - child_policy_ == nullptr || - // case 2b - (pending_child_policy_ == nullptr && - strcmp(child_policy_->name(), child_policy_name) != 0) || - // case 3b - (pending_child_policy_ != nullptr && - strcmp(pending_child_policy_->name(), child_policy_name) != 0); - LoadBalancingPolicy* policy_to_update = nullptr; - if (create_policy) { - // Cases 1, 2b, and 3b: create a new child policy. - // If child_policy_ is null, we set it (case 1), else we set - // pending_child_policy_ (cases 2b and 3b). - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this, - child_policy_ == nullptr ? "" : "pending ", child_policy_name); - } - // Swap the policy into place. - auto& lb_policy = - child_policy_ == nullptr ? child_policy_ : pending_child_policy_; - lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args); - policy_to_update = lb_policy.get(); - } else { - // Cases 2a and 3a: update an existing policy. - // If we have a pending child policy, send the update to the pending - // policy (case 3a), else send it to the current policy (case 2a). - policy_to_update = pending_child_policy_ != nullptr - ? pending_child_policy_.get() - : child_policy_.get(); - } - GPR_ASSERT(policy_to_update != nullptr); - // Update the policy. +void GrpcLb::CreateOrUpdateChildPolicyLocked() { + if (shutting_down_) return; + // Construct update args. + UpdateArgs update_args; + bool is_backend_from_grpclb_load_balancer = false; + if (fallback_mode_) { + // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't + // received any serverlist from the balancer, we use the fallback backends + // returned by the resolver. Note that the fallback backend list may be + // empty, in which case the new round_robin policy will keep the requested + // picks pending. + update_args.addresses = fallback_backend_addresses_; + } else { + update_args.addresses = serverlist_->GetServerAddressList( + lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats()); + is_backend_from_grpclb_load_balancer = true; + } + update_args.args = + CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer); + GPR_ASSERT(update_args.args != nullptr); + update_args.config = child_policy_config_; + // If the child policy name changes, we need to create a new child + // policy. When this happens, we leave child_policy_ as-is and store + // the new child policy in pending_child_policy_. Once the new child + // policy transitions into state READY, we swap it into child_policy_, + // replacing the original child policy. So pending_child_policy_ is + // non-null only between when we apply an update that changes the child + // policy name and when the new child reports state READY. + // + // Updates can arrive at any point during this transition. We always + // apply updates relative to the most recently created child policy, + // even if the most recent one is still in pending_child_policy_. This + // is true both when applying the updates to an existing child policy + // and when determining whether we need to create a new policy. + // + // As a result of this, there are several cases to consider here: + // + // 1. We have no existing child policy (i.e., we have started up but + // have not yet received a serverlist from the balancer or gone + // into fallback mode; in this case, both child_policy_ and + // pending_child_policy_ are null). In this case, we create a + // new child policy and store it in child_policy_. + // + // 2. We have an existing child policy and have no pending child policy + // from a previous update (i.e., either there has not been a + // previous update that changed the policy name, or we have already + // finished swapping in the new policy; in this case, child_policy_ + // is non-null but pending_child_policy_ is null). In this case: + // a. If child_policy_->name() equals child_policy_name, then we + // update the existing child policy. + // b. If child_policy_->name() does not equal child_policy_name, + // we create a new policy. The policy will be stored in + // pending_child_policy_ and will later be swapped into + // child_policy_ by the helper when the new child transitions + // into state READY. + // + // 3. We have an existing child policy and have a pending child policy + // from a previous update (i.e., a previous update set + // pending_child_policy_ as per case 2b above and that policy has + // not yet transitioned into state READY and been swapped into + // child_policy_; in this case, both child_policy_ and + // pending_child_policy_ are non-null). In this case: + // a. If pending_child_policy_->name() equals child_policy_name, + // then we update the existing pending child policy. + // b. If pending_child_policy->name() does not equal + // child_policy_name, then we create a new policy. The new + // policy is stored in pending_child_policy_ (replacing the one + // that was there before, which will be immediately shut down) + // and will later be swapped into child_policy_ by the helper + // when the new child transitions into state READY. + const char* child_policy_name = child_policy_config_ == nullptr + ? "round_robin" + : child_policy_config_->name(); + const bool create_policy = + // case 1 + child_policy_ == nullptr || + // case 2b + (pending_child_policy_ == nullptr && + strcmp(child_policy_->name(), child_policy_name) != 0) || + // case 3b + (pending_child_policy_ != nullptr && + strcmp(pending_child_policy_->name(), child_policy_name) != 0); + LoadBalancingPolicy* policy_to_update = nullptr; + if (create_policy) { + // Cases 1, 2b, and 3b: create a new child policy. + // If child_policy_ is null, we set it (case 1), else we set + // pending_child_policy_ (cases 2b and 3b). if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this, - policy_to_update == pending_child_policy_.get() ? "pending " : "", - policy_to_update); + gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this, + child_policy_ == nullptr ? "" : "pending ", child_policy_name); } - policy_to_update->UpdateLocked(std::move(update_args)); + // Swap the policy into place. + auto& lb_policy = + child_policy_ == nullptr ? child_policy_ : pending_child_policy_; + lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args); + policy_to_update = lb_policy.get(); + } else { + // Cases 2a and 3a: update an existing policy. + // If we have a pending child policy, send the update to the pending + // policy (case 3a), else send it to the current policy (case 2a). + policy_to_update = pending_child_policy_ != nullptr + ? pending_child_policy_.get() + : child_policy_.get(); } + GPR_ASSERT(policy_to_update != nullptr); + // Update the policy. + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this, + policy_to_update == pending_child_policy_.get() ? "pending " : "", + policy_to_update); + } + policy_to_update->UpdateLocked(std::move(update_args)); +} - // - // factory - // +// +// factory +// - class GrpcLbFactory : public LoadBalancingPolicyFactory { - public: - OrphanablePtr CreateLoadBalancingPolicy( - LoadBalancingPolicy::Args args) const override { - return MakeOrphanable(std::move(args)); - } +class GrpcLbFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr CreateLoadBalancingPolicy( + LoadBalancingPolicy::Args args) const override { + return MakeOrphanable(std::move(args)); + } - const char* name() const override { return kGrpclb; } + const char* name() const override { return kGrpclb; } - RefCountedPtr ParseLoadBalancingConfig( - const grpc_json* json, grpc_error** error) const override { - GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); - if (json == nullptr) { - return RefCountedPtr( - New(nullptr)); - } - InlinedVector error_list; - RefCountedPtr child_policy; - for (const grpc_json* field = json->child; field != nullptr; - field = field->next) { - if (field->key == nullptr) continue; - if (strcmp(field->key, "childPolicy") == 0) { - if (child_policy != nullptr) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:childPolicy error:Duplicate entry")); - } - grpc_error* parse_error = GRPC_ERROR_NONE; - child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - field, &parse_error); - if (parse_error != GRPC_ERROR_NONE) { - error_list.push_back(parse_error); - } + RefCountedPtr ParseLoadBalancingConfig( + const grpc_json* json, grpc_error** error) const override { + GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); + if (json == nullptr) { + return RefCountedPtr( + New(nullptr)); + } + InlinedVector error_list; + RefCountedPtr child_policy; + for (const grpc_json* field = json->child; field != nullptr; + field = field->next) { + if (field->key == nullptr) continue; + if (strcmp(field->key, "childPolicy") == 0) { + if (child_policy != nullptr) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:childPolicy error:Duplicate entry")); + } + grpc_error* parse_error = GRPC_ERROR_NONE; + child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( + field, &parse_error); + if (parse_error != GRPC_ERROR_NONE) { + error_list.push_back(parse_error); } - } - if (error_list.empty()) { - return RefCountedPtr( - New(std::move(child_policy))); - } else { - *error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list); - return nullptr; } } - }; + if (error_list.empty()) { + return RefCountedPtr( + New(std::move(child_policy))); + } else { + *error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list); + return nullptr; + } + } +}; } // namespace -} // namespace +} // namespace grpc_core // // Plugin registration diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 5cd2ea86c23..5fd88d956fa 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -250,6 +250,7 @@ class XdsLb : public LoadBalancingPolicy { grpc_channel_args* CreateChildPolicyArgsLocked( const grpc_channel_args* args); + static void OnDelayedRemovalTimer(void* arg, grpc_error* error); static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); XdsLb* xds_policy() const { return locality_map_->xds_policy(); } @@ -299,6 +300,8 @@ class XdsLb : public LoadBalancingPolicy { private: void OnLocalityStateUpdateLocked(); void UpdateConnectivityStateLocked(); + static void OnDelayedRemovalTimer(void* arg, grpc_error* error); + static void OnFailoverTimer(void* arg, grpc_error* error); static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); static void OnFailoverTimerLocked(void* arg, grpc_error* error); @@ -378,6 +381,7 @@ class XdsLb : public LoadBalancingPolicy { // Methods for dealing with fallback state. void MaybeCancelFallbackAtStartupChecks(); + static void OnFallbackTimer(void* arg, grpc_error* error); static void OnFallbackTimerLocked(void* arg, grpc_error* error); void UpdateFallbackPolicyLocked(); OrphanablePtr CreateFallbackPolicyLocked( @@ -798,8 +802,8 @@ void XdsLb::UpdateLocked(UpdateArgs args) { if (is_initial_update) { grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure - GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this, - grpc_combiner_scheduler(combiner())); + GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimer, this, + grpc_schedule_on_exec_ctx); fallback_at_startup_checks_pending_ = true; grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); } @@ -819,6 +823,14 @@ void XdsLb::MaybeCancelFallbackAtStartupChecks() { fallback_at_startup_checks_pending_ = false; } +void XdsLb::OnFallbackTimer(void* arg, grpc_error* error) { + XdsLb* xdslb_policy = static_cast(arg); + xdslb_policy->combiner()->Run( + GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this, + nullptr), + GRPC_ERROR_REF(error)); +} + void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { XdsLb* xdslb_policy = static_cast(arg); // If some fallback-at-startup check is done after the timer fires but before @@ -1103,10 +1115,9 @@ XdsLb::PriorityList::LocalityMap::LocalityMap(RefCountedPtr xds_policy, gpr_log(GPR_INFO, "[xdslb %p] Creating priority %" PRIu32, xds_policy_.get(), priority_); } - GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, - this, grpc_combiner_scheduler(xds_policy_->combiner())); - GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimerLocked, this, - grpc_combiner_scheduler(xds_policy_->combiner())); + + GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this, + grpc_schedule_on_exec_ctx); // Start the failover timer. Ref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked").release(); grpc_timer_init( @@ -1222,6 +1233,8 @@ void XdsLb::PriorityList::LocalityMap::DeactivateLocked() { xds_policy(), priority_, xds_policy()->locality_retention_interval_ms_); } + GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, + grpc_schedule_on_exec_ctx); grpc_timer_init( &delayed_removal_timer_, ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_, @@ -1350,6 +1363,15 @@ void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() { } } +void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer( + void* arg, grpc_error* error) { + LocalityMap* self = static_cast(arg); + self->xds_policy_->combiner->Run( + GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, + this, nullptr), + GRPC_ERROR_REF(error)); +} + void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( void* arg, grpc_error* error) { LocalityMap* self = static_cast(arg); @@ -1359,13 +1381,13 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( const bool keep = self->priority_list_update().Contains(self->priority_) && self->priority_ <= priority_list->current_priority(); if (!keep) { - // This check is to make sure we always delete the locality maps from the - // lowest priority even if the closures of the back-to-back timers are not - // run in FIFO order. + // This check is to make sure we always delete the locality maps from + // the lowest priority even if the closures of the back-to-back timers + // are not run in FIFO order. // TODO(juanlishen): Eliminate unnecessary maintenance overhead for some // deactivated locality maps when out-of-order closures are run. - // TODO(juanlishen): Check the timer implementation to see if this defense - // is necessary. + // TODO(juanlishen): Check the timer implementation to see if this + // defense is necessary. if (self->priority_ == priority_list->LowestPriority()) { priority_list->priorities_.pop_back(); } else { @@ -1380,6 +1402,15 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( self->Unref(DEBUG_LOCATION, "LocalityMap+timer"); } +void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg, + grpc_error* error) { + LocalityMap* self = static_cast(arg); + self->xds_policy_->combiner()->Run( + GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimerLocked, this, + nullptr), + GRPC_ERROR_REF(error)); +} + void XdsLb::PriorityList::LocalityMap::OnFailoverTimerLocked( void* arg, grpc_error* error) { LocalityMap* self = static_cast(arg); @@ -1402,8 +1433,6 @@ XdsLb::PriorityList::LocalityMap::Locality::Locality( gpr_log(GPR_INFO, "[xdslb %p] created Locality %p for %s", xds_policy(), this, name_->AsHumanReadableString()); } - GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, - this, grpc_combiner_scheduler(xds_policy()->combiner())); } XdsLb::PriorityList::LocalityMap::Locality::~Locality() { @@ -1458,8 +1487,8 @@ XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyLocked( lb_policy.get()); } // Add the xDS's interested_parties pollset_set to that of the newly created - // child policy. This will make the child policy progress upon activity on xDS - // LB, which in turn is tied to the application's call. + // child policy. This will make the child policy progress upon activity on + // xDS LB, which in turn is tied to the application's call. grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), xds_policy()->interested_parties()); return lb_policy; @@ -1529,8 +1558,8 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked( // that was there before, which will be immediately shut down) // and will later be swapped into child_policy_ by the helper // when the new child transitions into state READY. - // TODO(juanlishen): If the child policy is not configured via service config, - // use whatever algorithm is specified by the balancer. + // TODO(juanlishen): If the child policy is not configured via service + // config, use whatever algorithm is specified by the balancer. const char* child_policy_name = xds_policy()->child_policy_config_ == nullptr ? "round_robin" @@ -1623,6 +1652,8 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { weight_ = 0; // Start a timer to delete the locality. Ref(DEBUG_LOCATION, "Locality+timer").release(); + GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, + this, grpc_schedule_on_exec_ctx); grpc_timer_init( &delayed_removal_timer_, ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_, @@ -1630,6 +1661,15 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { delayed_removal_timer_callback_pending_ = true; } +void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer( + void* arg, grpc_error* error) { + Locality* self = static_cast(arg); + self->xds_policy()->combiner()->Run( + GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, + this, nullptr), + GRPC_ERROR_REF(error)); +} + void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimerLocked( void* arg, grpc_error* error) { Locality* self = static_cast(arg);