|
|
|
@ -53,6 +53,8 @@ |
|
|
|
|
|
|
|
|
|
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" |
|
|
|
|
|
|
|
|
|
#include <grpc/event_engine/event_engine.h> |
|
|
|
|
|
|
|
|
|
// IWYU pragma: no_include <sys/socket.h>
|
|
|
|
|
|
|
|
|
|
#include <inttypes.h> |
|
|
|
@ -76,6 +78,8 @@ |
|
|
|
|
#include "absl/strings/str_join.h" |
|
|
|
|
#include "absl/strings/string_view.h" |
|
|
|
|
#include "absl/strings/strip.h" |
|
|
|
|
#include "absl/time/clock.h" |
|
|
|
|
#include "absl/time/time.h" |
|
|
|
|
#include "absl/types/optional.h" |
|
|
|
|
#include "absl/types/variant.h" |
|
|
|
|
#include "upb/upb.hpp" |
|
|
|
@ -109,6 +113,7 @@ |
|
|
|
|
#include "src/core/lib/channel/channelz.h" |
|
|
|
|
#include "src/core/lib/config/core_configuration.h" |
|
|
|
|
#include "src/core/lib/debug/trace.h" |
|
|
|
|
#include "src/core/lib/event_engine/event_engine_factory.h" |
|
|
|
|
#include "src/core/lib/gpr/string.h" |
|
|
|
|
#include "src/core/lib/gpr/useful.h" |
|
|
|
|
#include "src/core/lib/gprpp/debug_location.h" |
|
|
|
@ -155,6 +160,9 @@ const char kGrpcLbAddressAttributeKey[] = "grpclb"; |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
using ::grpc_event_engine::experimental::EventEngine; |
|
|
|
|
using ::grpc_event_engine::experimental::GetDefaultEventEngine; |
|
|
|
|
|
|
|
|
|
constexpr char kGrpclb[] = "grpclb"; |
|
|
|
|
|
|
|
|
|
class GrpcLbConfig : public LoadBalancingPolicy::Config { |
|
|
|
@ -212,13 +220,15 @@ class GrpcLb : public LoadBalancingPolicy { |
|
|
|
|
void ScheduleNextClientLoadReportLocked(); |
|
|
|
|
void SendClientLoadReportLocked(); |
|
|
|
|
|
|
|
|
|
static void MaybeSendClientLoadReport(void* arg, grpc_error_handle error); |
|
|
|
|
// EventEngine callbacks
|
|
|
|
|
void MaybeSendClientLoadReport(); |
|
|
|
|
void MaybeSendClientLoadReportLocked(); |
|
|
|
|
|
|
|
|
|
static void ClientLoadReportDone(void* arg, grpc_error_handle error); |
|
|
|
|
static void OnInitialRequestSent(void* arg, grpc_error_handle error); |
|
|
|
|
static void OnBalancerMessageReceived(void* arg, grpc_error_handle error); |
|
|
|
|
static void OnBalancerStatusReceived(void* arg, grpc_error_handle error); |
|
|
|
|
|
|
|
|
|
void MaybeSendClientLoadReportLocked(grpc_error_handle error); |
|
|
|
|
void ClientLoadReportDoneLocked(grpc_error_handle error); |
|
|
|
|
void OnInitialRequestSentLocked(); |
|
|
|
|
void OnBalancerMessageReceivedLocked(); |
|
|
|
@ -253,13 +263,11 @@ class GrpcLb : public LoadBalancingPolicy { |
|
|
|
|
// Created after the first serverlist is received.
|
|
|
|
|
RefCountedPtr<GrpcLbClientStats> client_stats_; |
|
|
|
|
Duration client_stats_report_interval_; |
|
|
|
|
grpc_timer client_load_report_timer_; |
|
|
|
|
bool client_load_report_timer_callback_pending_ = false; |
|
|
|
|
absl::optional<EventEngine::TaskHandle> client_load_report_handle_; |
|
|
|
|
bool last_client_load_report_counters_were_zero_ = false; |
|
|
|
|
bool client_load_report_is_due_ = false; |
|
|
|
|
// The closure used for either the load report timer or the callback for
|
|
|
|
|
// completion of sending the load report.
|
|
|
|
|
grpc_closure client_load_report_closure_; |
|
|
|
|
// The closure used for the completion of sending the load report.
|
|
|
|
|
grpc_closure client_load_report_done_closure_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class SubchannelWrapper : public DelegatingSubchannel { |
|
|
|
@ -823,7 +831,7 @@ GrpcLb::BalancerCallState::BalancerCallState( |
|
|
|
|
OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx); |
|
|
|
|
GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived, |
|
|
|
|
this, grpc_schedule_on_exec_ctx); |
|
|
|
|
GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport, |
|
|
|
|
GRPC_CLOSURE_INIT(&client_load_report_done_closure_, ClientLoadReportDone, |
|
|
|
|
this, grpc_schedule_on_exec_ctx); |
|
|
|
|
const Timestamp deadline = |
|
|
|
|
grpclb_policy()->lb_call_timeout_ == Duration::Zero() |
|
|
|
@ -866,8 +874,9 @@ void GrpcLb::BalancerCallState::Orphan() { |
|
|
|
|
// up. Otherwise, we are here because grpclb_policy has to orphan a failed
|
|
|
|
|
// call, then the following cancellation will be a no-op.
|
|
|
|
|
grpc_call_cancel_internal(lb_call_); |
|
|
|
|
if (client_load_report_timer_callback_pending_) { |
|
|
|
|
grpc_timer_cancel(&client_load_report_timer_); |
|
|
|
|
if (client_load_report_handle_.has_value() && |
|
|
|
|
GetDefaultEventEngine()->Cancel(client_load_report_handle_.value())) { |
|
|
|
|
Unref(DEBUG_LOCATION, "client_load_report cancelled"); |
|
|
|
|
} |
|
|
|
|
// Note that the initial ref is hold by lb_on_balancer_status_received_
|
|
|
|
|
// instead of the caller of this function. So the corresponding unref happens
|
|
|
|
@ -951,34 +960,20 @@ void GrpcLb::BalancerCallState::StartQuery() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { |
|
|
|
|
// InvalidateNow to avoid getting stuck re-initializing this timer
|
|
|
|
|
// in a loop while draining the currently-held WorkSerializer.
|
|
|
|
|
// Also see https://github.com/grpc/grpc/issues/26079.
|
|
|
|
|
ExecCtx::Get()->InvalidateNow(); |
|
|
|
|
const Timestamp next_client_load_report_time = |
|
|
|
|
ExecCtx::Get()->Now() + client_stats_report_interval_; |
|
|
|
|
GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport, |
|
|
|
|
this, grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_timer_init(&client_load_report_timer_, next_client_load_report_time, |
|
|
|
|
&client_load_report_closure_); |
|
|
|
|
client_load_report_timer_callback_pending_ = true; |
|
|
|
|
client_load_report_handle_ = GetDefaultEventEngine()->RunAt( |
|
|
|
|
absl::Now() + absl::Milliseconds(client_stats_report_interval_.millis()), |
|
|
|
|
[this] { MaybeSendClientLoadReport(); }); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::BalancerCallState::MaybeSendClientLoadReport( |
|
|
|
|
void* arg, grpc_error_handle error) { |
|
|
|
|
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); |
|
|
|
|
(void)GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
lb_calld->grpclb_policy()->work_serializer()->Run( |
|
|
|
|
[lb_calld, error]() { lb_calld->MaybeSendClientLoadReportLocked(error); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
void GrpcLb::BalancerCallState::MaybeSendClientLoadReport() { |
|
|
|
|
grpclb_policy()->work_serializer()->Run( |
|
|
|
|
[this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked( |
|
|
|
|
grpc_error_handle error) { |
|
|
|
|
client_load_report_timer_callback_pending_ = false; |
|
|
|
|
if (error != GRPC_ERROR_NONE || this != grpclb_policy()->lb_calld_.get()) { |
|
|
|
|
void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() { |
|
|
|
|
client_load_report_handle_.reset(); |
|
|
|
|
if (this != grpclb_policy()->lb_calld_.get()) { |
|
|
|
|
Unref(DEBUG_LOCATION, "client_load_report"); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// If we've already sent the initial request, then we can go ahead and send
|
|
|
|
@ -1031,10 +1026,8 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { |
|
|
|
|
memset(&op, 0, sizeof(op)); |
|
|
|
|
op.op = GRPC_OP_SEND_MESSAGE; |
|
|
|
|
op.data.send_message.send_message = send_message_payload_; |
|
|
|
|
GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDone, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_call_error call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
lb_call_, &op, 1, &client_load_report_closure_); |
|
|
|
|
lb_call_, &op, 1, &client_load_report_done_closure_); |
|
|
|
|
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"[grpclb %p] lb_calld=%p call_error=%d sending client load report", |
|
|
|
|