|
|
@ -72,6 +72,7 @@ |
|
|
|
|
|
|
|
|
|
|
|
#include "absl/container/inlined_vector.h" |
|
|
|
#include "absl/container/inlined_vector.h" |
|
|
|
#include "absl/functional/function_ref.h" |
|
|
|
#include "absl/functional/function_ref.h" |
|
|
|
|
|
|
|
#include "absl/log/check.h" |
|
|
|
#include "absl/status/status.h" |
|
|
|
#include "absl/status/status.h" |
|
|
|
#include "absl/status/statusor.h" |
|
|
|
#include "absl/status/statusor.h" |
|
|
|
#include "absl/strings/str_cat.h" |
|
|
|
#include "absl/strings/str_cat.h" |
|
|
@ -884,8 +885,8 @@ GrpcLb::BalancerCallState::BalancerCallState( |
|
|
|
GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState" |
|
|
|
GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState" |
|
|
|
: nullptr), |
|
|
|
: nullptr), |
|
|
|
grpclb_policy_(std::move(parent_grpclb_policy)) { |
|
|
|
grpclb_policy_(std::move(parent_grpclb_policy)) { |
|
|
|
GPR_ASSERT(grpclb_policy_ != nullptr); |
|
|
|
CHECK(grpclb_policy_ != nullptr); |
|
|
|
GPR_ASSERT(!grpclb_policy()->shutting_down_); |
|
|
|
CHECK(!grpclb_policy()->shutting_down_); |
|
|
|
// Init the LB call. Note that the LB call will progress every time there's
|
|
|
|
// Init the LB call. Note that the LB call will progress every time there's
|
|
|
|
// activity in grpclb_policy_->interested_parties(), which is comprised of
|
|
|
|
// activity in grpclb_policy_->interested_parties(), which is comprised of
|
|
|
|
// the polling entities from client_channel.
|
|
|
|
// the polling entities from client_channel.
|
|
|
@ -922,7 +923,7 @@ GrpcLb::BalancerCallState::BalancerCallState( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
GrpcLb::BalancerCallState::~BalancerCallState() { |
|
|
|
GrpcLb::BalancerCallState::~BalancerCallState() { |
|
|
|
GPR_ASSERT(lb_call_ != nullptr); |
|
|
|
CHECK_NE(lb_call_, nullptr); |
|
|
|
grpc_call_unref(lb_call_); |
|
|
|
grpc_call_unref(lb_call_); |
|
|
|
grpc_metadata_array_destroy(&lb_initial_metadata_recv_); |
|
|
|
grpc_metadata_array_destroy(&lb_initial_metadata_recv_); |
|
|
|
grpc_metadata_array_destroy(&lb_trailing_metadata_recv_); |
|
|
|
grpc_metadata_array_destroy(&lb_trailing_metadata_recv_); |
|
|
@ -932,7 +933,7 @@ GrpcLb::BalancerCallState::~BalancerCallState() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void GrpcLb::BalancerCallState::Orphan() { |
|
|
|
void GrpcLb::BalancerCallState::Orphan() { |
|
|
|
GPR_ASSERT(lb_call_ != nullptr); |
|
|
|
CHECK_NE(lb_call_, nullptr); |
|
|
|
// If we are here because grpclb_policy wants to cancel the call,
|
|
|
|
// If we are here because grpclb_policy wants to cancel the call,
|
|
|
|
// lb_on_balancer_status_received_ will complete the cancellation and clean
|
|
|
|
// lb_on_balancer_status_received_ will complete the cancellation and clean
|
|
|
|
// up. Otherwise, we are here because grpclb_policy has to orphan a failed
|
|
|
|
// up. Otherwise, we are here because grpclb_policy has to orphan a failed
|
|
|
@ -949,7 +950,7 @@ void GrpcLb::BalancerCallState::Orphan() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void GrpcLb::BalancerCallState::StartQuery() { |
|
|
|
void GrpcLb::BalancerCallState::StartQuery() { |
|
|
|
GPR_ASSERT(lb_call_ != nullptr); |
|
|
|
CHECK_NE(lb_call_, nullptr); |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { |
|
|
|
gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p", |
|
|
|
gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p", |
|
|
|
grpclb_policy_.get(), this, lb_call_); |
|
|
|
grpclb_policy_.get(), this, lb_call_); |
|
|
@ -967,7 +968,7 @@ void GrpcLb::BalancerCallState::StartQuery() { |
|
|
|
op->reserved = nullptr; |
|
|
|
op->reserved = nullptr; |
|
|
|
op++; |
|
|
|
op++; |
|
|
|
// Op: send request message.
|
|
|
|
// Op: send request message.
|
|
|
|
GPR_ASSERT(send_message_payload_ != nullptr); |
|
|
|
CHECK_NE(send_message_payload_, nullptr); |
|
|
|
op->op = GRPC_OP_SEND_MESSAGE; |
|
|
|
op->op = GRPC_OP_SEND_MESSAGE; |
|
|
|
op->data.send_message.send_message = send_message_payload_; |
|
|
|
op->data.send_message.send_message = send_message_payload_; |
|
|
|
op->flags = 0; |
|
|
|
op->flags = 0; |
|
|
@ -981,7 +982,7 @@ void GrpcLb::BalancerCallState::StartQuery() { |
|
|
|
call_error = grpc_call_start_batch_and_execute(lb_call_, ops, |
|
|
|
call_error = grpc_call_start_batch_and_execute(lb_call_, ops, |
|
|
|
static_cast<size_t>(op - ops), |
|
|
|
static_cast<size_t>(op - ops), |
|
|
|
&lb_on_initial_request_sent_); |
|
|
|
&lb_on_initial_request_sent_); |
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
CHECK_EQ(call_error, GRPC_CALL_OK); |
|
|
|
// Op: recv initial metadata.
|
|
|
|
// Op: recv initial metadata.
|
|
|
|
op = ops; |
|
|
|
op = ops; |
|
|
|
op->op = GRPC_OP_RECV_INITIAL_METADATA; |
|
|
|
op->op = GRPC_OP_RECV_INITIAL_METADATA; |
|
|
@ -1004,7 +1005,7 @@ void GrpcLb::BalancerCallState::StartQuery() { |
|
|
|
call_error = grpc_call_start_batch_and_execute( |
|
|
|
call_error = grpc_call_start_batch_and_execute( |
|
|
|
lb_call_, ops, static_cast<size_t>(op - ops), |
|
|
|
lb_call_, ops, static_cast<size_t>(op - ops), |
|
|
|
&lb_on_balancer_message_received_); |
|
|
|
&lb_on_balancer_message_received_); |
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
CHECK_EQ(call_error, GRPC_CALL_OK); |
|
|
|
// Op: recv server status.
|
|
|
|
// Op: recv server status.
|
|
|
|
op = ops; |
|
|
|
op = ops; |
|
|
|
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
|
|
|
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
|
|
@ -1021,7 +1022,7 @@ void GrpcLb::BalancerCallState::StartQuery() { |
|
|
|
call_error = grpc_call_start_batch_and_execute( |
|
|
|
call_error = grpc_call_start_batch_and_execute( |
|
|
|
lb_call_, ops, static_cast<size_t>(op - ops), |
|
|
|
lb_call_, ops, static_cast<size_t>(op - ops), |
|
|
|
&lb_on_balancer_status_received_); |
|
|
|
&lb_on_balancer_status_received_); |
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
CHECK_EQ(call_error, GRPC_CALL_OK); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { |
|
|
|
void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { |
|
|
@ -1053,7 +1054,7 @@ void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() { |
|
|
|
|
|
|
|
|
|
|
|
void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { |
|
|
|
void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { |
|
|
|
// Construct message payload.
|
|
|
|
// Construct message payload.
|
|
|
|
GPR_ASSERT(send_message_payload_ == nullptr); |
|
|
|
CHECK_EQ(send_message_payload_, nullptr); |
|
|
|
// Get snapshot of stats.
|
|
|
|
// Get snapshot of stats.
|
|
|
|
int64_t num_calls_started; |
|
|
|
int64_t num_calls_started; |
|
|
|
int64_t num_calls_finished; |
|
|
|
int64_t num_calls_finished; |
|
|
@ -1097,7 +1098,7 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { |
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
"[grpclb %p] lb_calld=%p call_error=%d sending client load report", |
|
|
|
"[grpclb %p] lb_calld=%p call_error=%d sending client load report", |
|
|
|
grpclb_policy_.get(), this, call_error); |
|
|
|
grpclb_policy_.get(), this, call_error); |
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
CHECK_EQ(call_error, GRPC_CALL_OK); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -1197,7 +1198,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() { |
|
|
|
break; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
case response.SERVERLIST: { |
|
|
|
case response.SERVERLIST: { |
|
|
|
GPR_ASSERT(lb_call_ != nullptr); |
|
|
|
CHECK_NE(lb_call_, nullptr); |
|
|
|
auto serverlist_wrapper = |
|
|
|
auto serverlist_wrapper = |
|
|
|
MakeRefCounted<Serverlist>(std::move(response.serverlist)); |
|
|
|
MakeRefCounted<Serverlist>(std::move(response.serverlist)); |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { |
|
|
@ -1302,7 +1303,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() { |
|
|
|
// Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
|
|
|
|
// Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
|
|
|
|
const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
|
|
|
const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
|
|
|
lb_call_, &op, 1, &lb_on_balancer_message_received_); |
|
|
|
lb_call_, &op, 1, &lb_on_balancer_message_received_); |
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
CHECK_EQ(call_error, GRPC_CALL_OK); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown"); |
|
|
|
Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown"); |
|
|
|
} |
|
|
|
} |
|
|
@ -1318,7 +1319,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceived( |
|
|
|
|
|
|
|
|
|
|
|
void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( |
|
|
|
void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( |
|
|
|
grpc_error_handle error) { |
|
|
|
grpc_error_handle error) { |
|
|
|
GPR_ASSERT(lb_call_ != nullptr); |
|
|
|
CHECK_NE(lb_call_, nullptr); |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { |
|
|
|
char* status_details = grpc_slice_to_c_string(lb_call_status_details_); |
|
|
|
char* status_details = grpc_slice_to_c_string(lb_call_status_details_); |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
@ -1337,7 +1338,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( |
|
|
|
// case.
|
|
|
|
// case.
|
|
|
|
grpclb_policy()->lb_calld_.reset(); |
|
|
|
grpclb_policy()->lb_calld_.reset(); |
|
|
|
if (grpclb_policy()->fallback_at_startup_checks_pending_) { |
|
|
|
if (grpclb_policy()->fallback_at_startup_checks_pending_) { |
|
|
|
GPR_ASSERT(!seen_serverlist_); |
|
|
|
CHECK(!seen_serverlist_); |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"[grpclb %p] Balancer call finished without receiving " |
|
|
|
"[grpclb %p] Balancer call finished without receiving " |
|
|
|
"serverlist; entering fallback mode", |
|
|
|
"serverlist; entering fallback mode", |
|
|
@ -1352,7 +1353,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( |
|
|
|
// This handles the fallback-after-startup case.
|
|
|
|
// This handles the fallback-after-startup case.
|
|
|
|
grpclb_policy()->MaybeEnterFallbackModeAfterStartup(); |
|
|
|
grpclb_policy()->MaybeEnterFallbackModeAfterStartup(); |
|
|
|
} |
|
|
|
} |
|
|
|
GPR_ASSERT(!grpclb_policy()->shutting_down_); |
|
|
|
CHECK(!grpclb_policy()->shutting_down_); |
|
|
|
grpclb_policy()->channel_control_helper()->RequestReresolution(); |
|
|
|
grpclb_policy()->channel_control_helper()->RequestReresolution(); |
|
|
|
if (seen_initial_response_) { |
|
|
|
if (seen_initial_response_) { |
|
|
|
// If we lose connection to the LB server, reset the backoff and restart
|
|
|
|
// If we lose connection to the LB server, reset the backoff and restart
|
|
|
@ -1508,7 +1509,7 @@ void GrpcLb::ShutdownLocked() { |
|
|
|
if (lb_channel_ != nullptr) { |
|
|
|
if (lb_channel_ != nullptr) { |
|
|
|
if (parent_channelz_node_ != nullptr) { |
|
|
|
if (parent_channelz_node_ != nullptr) { |
|
|
|
channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node(); |
|
|
|
channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node(); |
|
|
|
GPR_ASSERT(child_channelz_node != nullptr); |
|
|
|
CHECK_NE(child_channelz_node, nullptr); |
|
|
|
parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid()); |
|
|
|
parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid()); |
|
|
|
} |
|
|
|
} |
|
|
|
lb_channel_.reset(); |
|
|
|
lb_channel_.reset(); |
|
|
@ -1560,7 +1561,7 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { |
|
|
|
} |
|
|
|
} |
|
|
|
const bool is_initial_update = lb_channel_ == nullptr; |
|
|
|
const bool is_initial_update = lb_channel_ == nullptr; |
|
|
|
config_ = args.config.TakeAsSubclass<GrpcLbConfig>(); |
|
|
|
config_ = args.config.TakeAsSubclass<GrpcLbConfig>(); |
|
|
|
GPR_ASSERT(config_ != nullptr); |
|
|
|
CHECK(config_ != nullptr); |
|
|
|
args_ = std::move(args.args); |
|
|
|
args_ = std::move(args.args); |
|
|
|
// Update fallback address list.
|
|
|
|
// Update fallback address list.
|
|
|
|
if (!args.addresses.ok()) { |
|
|
|
if (!args.addresses.ok()) { |
|
|
@ -1634,7 +1635,7 @@ absl::Status GrpcLb::UpdateBalancerChannelLocked() { |
|
|
|
lb_channel_.reset(Channel::FromC( |
|
|
|
lb_channel_.reset(Channel::FromC( |
|
|
|
grpc_channel_create(uri_str.c_str(), channel_credentials.get(), |
|
|
|
grpc_channel_create(uri_str.c_str(), channel_credentials.get(), |
|
|
|
lb_channel_args.ToC().get()))); |
|
|
|
lb_channel_args.ToC().get()))); |
|
|
|
GPR_ASSERT(lb_channel_ != nullptr); |
|
|
|
CHECK(lb_channel_ != nullptr); |
|
|
|
// Set up channelz linkage.
|
|
|
|
// Set up channelz linkage.
|
|
|
|
channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node(); |
|
|
|
channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node(); |
|
|
|
auto parent_channelz_node = args_.GetObjectRef<channelz::ChannelNode>(); |
|
|
|
auto parent_channelz_node = args_.GetObjectRef<channelz::ChannelNode>(); |
|
|
@ -1664,10 +1665,10 @@ void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() { |
|
|
|
//
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
|
|
void GrpcLb::StartBalancerCallLocked() { |
|
|
|
void GrpcLb::StartBalancerCallLocked() { |
|
|
|
GPR_ASSERT(lb_channel_ != nullptr); |
|
|
|
CHECK(lb_channel_ != nullptr); |
|
|
|
if (shutting_down_) return; |
|
|
|
if (shutting_down_) return; |
|
|
|
// Init the LB call data.
|
|
|
|
// Init the LB call data.
|
|
|
|
GPR_ASSERT(lb_calld_ == nullptr); |
|
|
|
CHECK(lb_calld_ == nullptr); |
|
|
|
lb_calld_ = MakeOrphanable<BalancerCallState>(Ref()); |
|
|
|
lb_calld_ = MakeOrphanable<BalancerCallState>(Ref()); |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
@ -1826,7 +1827,7 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() { |
|
|
|
} |
|
|
|
} |
|
|
|
update_args.args = |
|
|
|
update_args.args = |
|
|
|
CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer); |
|
|
|
CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer); |
|
|
|
GPR_ASSERT(update_args.args != ChannelArgs()); |
|
|
|
CHECK(update_args.args != ChannelArgs()); |
|
|
|
update_args.config = config_->child_policy(); |
|
|
|
update_args.config = config_->child_policy(); |
|
|
|
// Create child policy if needed.
|
|
|
|
// Create child policy if needed.
|
|
|
|
if (child_policy_ == nullptr) { |
|
|
|
if (child_policy_ == nullptr) { |
|
|
@ -1856,7 +1857,7 @@ void GrpcLb::CacheDeletedSubchannelLocked( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void GrpcLb::StartSubchannelCacheTimerLocked() { |
|
|
|
void GrpcLb::StartSubchannelCacheTimerLocked() { |
|
|
|
GPR_ASSERT(!cached_subchannels_.empty()); |
|
|
|
CHECK(!cached_subchannels_.empty()); |
|
|
|
subchannel_cache_timer_handle_ = |
|
|
|
subchannel_cache_timer_handle_ = |
|
|
|
channel_control_helper()->GetEventEngine()->RunAfter( |
|
|
|
channel_control_helper()->GetEventEngine()->RunAfter( |
|
|
|
cached_subchannels_.begin()->first - Timestamp::Now(), |
|
|
|
cached_subchannels_.begin()->first - Timestamp::Now(), |
|
|
|