grpclb.cc changes

reviewable/pr20542/r1
Yash Tibrewal 5 years ago
parent 3a33ed4762
commit a17bbbd840
  1. 31
      src/core/ext/filters/client_channel/client_channel.cc
  2. 590
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  3. 4
      src/core/lib/iomgr/combiner.cc
  4. 4
      src/core/lib/iomgr/combiner.h

@ -1044,7 +1044,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
: parent_(std::move(parent)),
state_(new_state),
connected_subchannel_(std::move(connected_subchannel)) {
parent_->parent_->chand_->combiner_->Exec(
parent_->parent_->chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&closure_, ApplyUpdateInControlPlaneCombiner,
this, nullptr),
GRPC_ERROR_NONE);
@ -1140,7 +1140,7 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
grpc_polling_entity_add_to_pollset_set(&pollent_,
chand_->interested_parties_);
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
chand_->combiner_->Exec(
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&add_closure_, AddWatcherLocked, this, nullptr),
GRPC_ERROR_NONE);
}
@ -1168,7 +1168,7 @@ void ChannelData::ExternalConnectivityWatcher::Notify(
// Not needed in state SHUTDOWN, because the tracker will
// automatically remove all watchers in that case.
if (state != GRPC_CHANNEL_SHUTDOWN) {
chand_->combiner_->Exec(
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, nullptr),
GRPC_ERROR_NONE);
}
@ -1182,7 +1182,7 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() {
}
GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_CANCELLED);
// Hop back into the combiner to clean up.
chand_->combiner_->Exec(
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, nullptr),
GRPC_ERROR_NONE);
}
@ -1220,9 +1220,11 @@ class ChannelData::ConnectivityWatcherAdder {
initial_state_(initial_state),
watcher_(std::move(watcher)) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
GRPC_CLOSURE_INIT(&closure_, &ConnectivityWatcherAdder::AddWatcherLocked,
this, grpc_combiner_scheduler(chand_->combiner_));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&closure_,
&ConnectivityWatcherAdder::AddWatcherLocked, this,
nullptr),
GRPC_ERROR_NONE);
}
private:
@ -1252,10 +1254,11 @@ class ChannelData::ConnectivityWatcherRemover {
AsyncConnectivityStateWatcherInterface* watcher)
: chand_(chand), watcher_(watcher) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
GRPC_CLOSURE_INIT(&closure_,
&ConnectivityWatcherRemover::RemoveWatcherLocked, this,
grpc_combiner_scheduler(chand_->combiner_));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&closure_,
&ConnectivityWatcherRemover::RemoveWatcherLocked,
this, nullptr),
GRPC_ERROR_NONE);
}
private:
@ -1880,7 +1883,7 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem,
// Pop into control plane combiner for remaining ops.
op->handler_private.extra_arg = elem;
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
chand->combiner_->Exec(
chand->combiner_->Run(
GRPC_CLOSURE_INIT(&op->handler_private.closure,
ChannelData::StartTransportOpLocked, op, nullptr),
GRPC_ERROR_NONE);
@ -1949,8 +1952,8 @@ grpc_connectivity_state ChannelData::CheckConnectivityState(
grpc_connectivity_state out = state_tracker_.state();
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
combiner_->Exec(GRPC_CLOSURE_CREATE(TryToConnectLocked, this, nullptr),
GRPC_ERROR_NONE);
combiner_->Run(GRPC_CLOSURE_CREATE(TryToConnectLocked, this, nullptr),
GRPC_ERROR_NONE);
}
return out;
}

@ -174,6 +174,12 @@ class GrpcLb : public LoadBalancingPolicy {
static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
static void MaybeSendClientLoadReport(void* arg, grpc_error* error);
static void ClientLoadReportDone(void* arg, grpc_error* error);
static void OnInitialRequestSent(void* arg, grpc_error* error);
static void OnBalancerMessageReceived(void* arg, grpc_error* error);
static void OnBalancerStatusReceived(void* arg, grpc_error* error);
static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
@ -312,17 +318,21 @@ class GrpcLb : public LoadBalancingPolicy {
// Helper functions used in UpdateLocked().
void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses,
const grpc_channel_args& args);
static void OnBalancerChannelConnectivityChanged(void* arg,
grpc_error* error);
static void OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_error* error);
void CancelBalancerChannelConnectivityWatchLocked();
// Methods for dealing with fallback state.
void MaybeEnterFallbackModeAfterStartup();
static void OnFallbackTimer(void* arg, grpc_error* error);
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
// Methods for dealing with the balancer call.
void StartBalancerCallLocked();
void StartBalancerCallRetryTimerLocked();
static void OnBalancerCallRetryTimer(void* arg, grpc_error* error);
static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
// Methods for dealing with the child policy.
@ -783,14 +793,12 @@ GrpcLb::BalancerCallState::BalancerCallState(
// Init other data associated with the LB call.
grpc_metadata_array_init(&lb_initial_metadata_recv_);
grpc_metadata_array_init(&lb_trailing_metadata_recv_);
GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSent, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
OnBalancerMessageReceivedLocked, this,
grpc_combiner_scheduler(grpclb_policy()->combiner()));
GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
OnBalancerStatusReceivedLocked, this,
grpc_combiner_scheduler(grpclb_policy()->combiner()));
OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived,
this, grpc_schedule_on_exec_ctx);
}
GrpcLb::BalancerCallState::~BalancerCallState() {
@ -894,14 +902,22 @@ void GrpcLb::BalancerCallState::StartQuery() {
void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
const grpc_millis next_client_load_report_time =
ExecCtx::Get()->Now() + client_stats_report_interval_;
GRPC_CLOSURE_INIT(&client_load_report_closure_,
MaybeSendClientLoadReportLocked, this,
grpc_combiner_scheduler(grpclb_policy()->combiner()));
GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport,
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
&client_load_report_closure_);
client_load_report_timer_callback_pending_ = true;
}
void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(void* arg,
grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&client_load_report_closure_,
MaybeSendClientLoadReportLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error));
}
void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
void* arg, grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
@ -966,8 +982,8 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = send_message_payload_;
GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDone, this,
grpc_schedule_on_exec_ctx);
grpc_call_error call_error = grpc_call_start_batch_and_execute(
lb_call_, &op, 1, &client_load_report_closure_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
@ -978,6 +994,15 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
}
}
void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDone,
this, nullptr),
GRPC_ERROR_REF(error));
}
void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
@ -991,6 +1016,15 @@ void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
lb_calld->ScheduleNextClientLoadReportLocked();
}
void GrpcLb::BalancerCallState::OnInitialRequestSent(void* arg,
grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_,
OnInitialRequestSentLocked, this, nullptr),
GRPC_ERROR_REF(error));
}
void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
@ -1006,6 +1040,15 @@ void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
}
void GrpcLb::BalancerCallState::OnBalancerMessageReceived(void* arg,
grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
OnBalancerMessageReceivedLocked, this, nullptr),
GRPC_ERROR_REF(error));
}
void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
void* arg, grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
@ -1150,6 +1193,14 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
}
}
void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
void* arg, grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
OnBalancerStatusReceivedLocked, this, nullptr));
}
void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
void* arg, grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
@ -1313,11 +1364,11 @@ GrpcLb::GrpcLb(Args args)
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1000)) {
// Initialization.
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
grpc_combiner_scheduler(args.combiner));
&GrpcLb::OnBalancerChannelConnectivityChanged, this,
grpc_schedule_on_exec_ctx);
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg);
@ -1482,9 +1533,18 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked(
response_generator_->SetResponse(std::move(result));
}
void GrpcLb::OnBalancerChannelConnectivityChanged(void* arg,
grpc_error* error) {
GrpcLb* self = static_cast<GrpcLb*>(arg);
self->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked,
this, nullptr),
GRPC_ERROR_REF(error));
}
void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_error* error) {
GrpcLb* self = static_cast<GrpcLb*>(arg);
if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) {
if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Not in TRANSIENT_FAILURE. Renew connectivity watch.
@ -1561,278 +1621,294 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() {
// with the callback.
auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
self.release();
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
this, grpc_combiner_scheduler(combiner()));
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this,
grpc_schedule_on_exec_ctx);
retry_timer_callback_pending_ = true;
grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
}
void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->retry_timer_callback_pending_ = false;
if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
grpclb_policy->lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
grpclb_policy);
grpclb_policy->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_on_call_retry_,
&GrpcLb::OnBalancerCallRetryTimerLocked, this, nullptr),
GRPC_ERROR_REF(error));
void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->retry_timer_callback_pending_ = false;
if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
grpclb_policy->lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
grpclb_policy);
}
grpclb_policy->StartBalancerCallLocked();
}
grpclb_policy->StartBalancerCallLocked();
grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
}
grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
}
//
// code for handling fallback mode
//
//
// code for handling fallback mode
//
void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
// Enter fallback mode if all of the following are true:
// - We are not currently in fallback mode.
// - We are not currently waiting for the initial fallback timeout.
// - We are not currently in contact with the balancer.
// - The child policy is not in state READY.
if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
(lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
!child_policy_ready_) {
gpr_log(GPR_INFO,
"[grpclb %p] lost contact with balancer and backends from "
"most recent serverlist; entering fallback mode",
this);
fallback_mode_ = true;
CreateOrUpdateChildPolicyLocked();
void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
// Enter fallback mode if all of the following are true:
// - We are not currently in fallback mode.
// - We are not currently waiting for the initial fallback timeout.
// - We are not currently in contact with the balancer.
// - The child policy is not in state READY.
if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
(lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
!child_policy_ready_) {
gpr_log(GPR_INFO,
"[grpclb %p] lost contact with balancer and backends from "
"most recent serverlist; entering fallback mode",
this);
fallback_mode_ = true;
CreateOrUpdateChildPolicyLocked();
}
}
}
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (grpclb_policy->fallback_at_startup_checks_pending_ &&
!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
grpclb_policy);
grpclb_policy->fallback_at_startup_checks_pending_ = false;
grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
grpclb_policy->fallback_mode_ = true;
grpclb_policy->CreateOrUpdateChildPolicyLocked();
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked,
this, nullptr),
GRPC_ERROR_REF(error));
}
grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}
//
// code for interacting with the child policy
//
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (grpclb_policy->fallback_at_startup_checks_pending_ &&
!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
grpclb_policy);
grpclb_policy->fallback_at_startup_checks_pending_ = false;
grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
grpclb_policy->fallback_mode_ = true;
grpclb_policy->CreateOrUpdateChildPolicyLocked();
}
grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}
grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
bool is_backend_from_grpclb_load_balancer) {
InlinedVector<grpc_arg, 2> args_to_add;
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
is_backend_from_grpclb_load_balancer));
if (is_backend_from_grpclb_load_balancer) {
//
// code for interacting with the child policy
//
grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
bool is_backend_from_grpclb_load_balancer) {
InlinedVector<grpc_arg, 2> args_to_add;
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1));
const_cast<char*>(
GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
is_backend_from_grpclb_load_balancer));
if (is_backend_from_grpclb_load_balancer) {
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1));
}
return grpc_channel_args_copy_and_add(args_, args_to_add.data(),
args_to_add.size());
}
return grpc_channel_args_copy_and_add(args_, args_to_add.data(),
args_to_add.size());
}
OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args) {
Helper* helper = New<Helper>(Ref());
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(helper);
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
name);
return nullptr;
}
helper->set_child(lb_policy.get());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this,
name, lb_policy.get());
OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args) {
Helper* helper = New<Helper>(Ref());
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(helper);
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
name);
return nullptr;
}
helper->set_child(lb_policy.get());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this,
name, lb_policy.get());
}
// Add the gRPC LB's interested_parties pollset_set to that of the newly
// created child policy. This will make the child policy progress upon
// activity on gRPC LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
// Add the gRPC LB's interested_parties pollset_set to that of the newly
// created child policy. This will make the child policy progress upon
// activity on gRPC LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
void GrpcLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return;
// Construct update args.
UpdateArgs update_args;
bool is_backend_from_grpclb_load_balancer = false;
if (fallback_mode_) {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new round_robin policy will keep the requested
// picks pending.
update_args.addresses = fallback_backend_addresses_;
} else {
update_args.addresses = serverlist_->GetServerAddressList(
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
is_backend_from_grpclb_load_balancer = true;
}
update_args.args =
CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
GPR_ASSERT(update_args.args != nullptr);
update_args.config = child_policy_config_;
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char* child_policy_name = child_policy_config_ == nullptr
? "round_robin"
: child_policy_config_->name();
const bool create_policy =
// case 1
child_policy_ == nullptr ||
// case 2b
(pending_child_policy_ == nullptr &&
strcmp(child_policy_->name(), child_policy_name) != 0) ||
// case 3b
(pending_child_policy_ != nullptr &&
strcmp(pending_child_policy_->name(), child_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
void GrpcLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return;
// Construct update args.
UpdateArgs update_args;
bool is_backend_from_grpclb_load_balancer = false;
if (fallback_mode_) {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new round_robin policy will keep the requested
// picks pending.
update_args.addresses = fallback_backend_addresses_;
} else {
update_args.addresses = serverlist_->GetServerAddressList(
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
is_backend_from_grpclb_load_balancer = true;
}
update_args.args =
CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
GPR_ASSERT(update_args.args != nullptr);
update_args.config = child_policy_config_;
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char* child_policy_name = child_policy_config_ == nullptr
? "round_robin"
: child_policy_config_->name();
const bool create_policy =
// case 1
child_policy_ == nullptr ||
// case 2b
(pending_child_policy_ == nullptr &&
strcmp(child_policy_->name(), child_policy_name) != 0) ||
// case 3b
(pending_child_policy_ != nullptr &&
strcmp(pending_child_policy_->name(), child_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
}
// Swap the policy into place.
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_child_policy_ != nullptr
? pending_child_policy_.get()
: child_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this,
policy_to_update == pending_child_policy_.get() ? "pending " : "",
policy_to_update);
}
// Swap the policy into place.
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_child_policy_ != nullptr
? pending_child_policy_.get()
: child_policy_.get();
policy_to_update->UpdateLocked(std::move(update_args));
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this,
policy_to_update == pending_child_policy_.get() ? "pending " : "",
policy_to_update);
}
policy_to_update->UpdateLocked(std::move(update_args));
}
//
// factory
//
//
// factory
//
class GrpcLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<GrpcLb>(std::move(args));
}
class GrpcLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<GrpcLb>(std::move(args));
}
const char* name() const override { return kGrpclb; }
const char* name() const override { return kGrpclb; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json == nullptr) {
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedGrpcLbConfig>(nullptr));
}
InlinedVector<grpc_error*, 2> error_list;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
for (const grpc_json* field = json->child; field != nullptr;
field = field->next) {
if (field->key == nullptr) continue;
if (strcmp(field->key, "childPolicy") == 0) {
if (child_policy != nullptr) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:childPolicy error:Duplicate entry"));
}
grpc_error* parse_error = GRPC_ERROR_NONE;
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
field, &parse_error);
if (parse_error != GRPC_ERROR_NONE) {
error_list.push_back(parse_error);
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json == nullptr) {
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedGrpcLbConfig>(nullptr));
}
InlinedVector<grpc_error*, 2> error_list;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
for (const grpc_json* field = json->child; field != nullptr;
field = field->next) {
if (field->key == nullptr) continue;
if (strcmp(field->key, "childPolicy") == 0) {
if (child_policy != nullptr) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:childPolicy error:Duplicate entry"));
}
grpc_error* parse_error = GRPC_ERROR_NONE;
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
field, &parse_error);
if (parse_error != GRPC_ERROR_NONE) {
error_list.push_back(parse_error);
}
}
}
if (error_list.empty()) {
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedGrpcLbConfig>(std::move(child_policy)));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
return nullptr;
}
}
if (error_list.empty()) {
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedGrpcLbConfig>(std::move(child_policy)));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
return nullptr;
}
}
};
};
} // namespace
} // namespace grpc_core
} // namespace
//
// Plugin registration

@ -341,11 +341,11 @@ static void enqueue_finally(void* closure, grpc_error* error) {
GRPC_ERROR_REF(error));
}
static void Combiner::Exec(grpc_closure* closure, grpc_error* error) {
static void Combiner::Run(grpc_closure* closure, grpc_error* error) {
combiner_exec(combiner, closure, error);
}
static void Combiner::FinallyExec(grpc_closure* closure, grpc_error* error) {
static void Combiner::FinallyRun(grpc_closure* closure, grpc_error* error) {
combiner_finally_exec(combiner, closure, exec);
}
} // namespace grpc_core

@ -62,8 +62,8 @@ extern grpc_core::DebugOnlyTraceFlag grpc_combiner_trace;
namespace grpc_core {
class Combiner {
public:
static void Exec(grpc_closure* closure, grpc_error* error);
static void FinallyExec(grpc_closure* closure, grpc_error* error);
static void Run(grpc_closure* closure, grpc_error* error);
static void FinallyRun(grpc_closure* closure, grpc_error* error);
Combiner* next_combiner_on_this_exec_ctx = nullptr;
grpc_closure_scheduler scheduler;
grpc_closure_scheduler finally_scheduler;

Loading…
Cancel
Save