diff --git a/BUILD b/BUILD index 3d985a67d13..b4cd98e6da8 100644 --- a/BUILD +++ b/BUILD @@ -3497,13 +3497,12 @@ grpc_cc_library( "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", ], external_deps = [ - "absl/container:inlined_vector", "absl/memory", + "absl/container:inlined_vector", "absl/status", "absl/status:statusor", "absl/strings", "absl/strings:str_format", - "absl/time", "absl/types:optional", "absl/types:variant", "upb_lib", @@ -3514,7 +3513,6 @@ grpc_cc_library( "channel_stack_type", "config", "debug_location", - "default_event_engine_factory_hdrs", "error", "gpr_base", "gpr_codegen", diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index c184b26b135..7e6065e1390 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -53,8 +53,6 @@ #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" -#include - // IWYU pragma: no_include #include @@ -78,8 +76,6 @@ #include "absl/strings/str_join.h" #include "absl/strings/string_view.h" #include "absl/strings/strip.h" -#include "absl/time/clock.h" -#include "absl/time/time.h" #include "absl/types/optional.h" #include "absl/types/variant.h" #include "upb/upb.hpp" @@ -113,7 +109,6 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/event_engine/event_engine_factory.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/debug_location.h" @@ -160,9 +155,6 @@ const char kGrpcLbAddressAttributeKey[] = "grpclb"; namespace { -using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetDefaultEventEngine; - constexpr char kGrpclb[] = "grpclb"; class GrpcLbConfig : public LoadBalancingPolicy::Config { @@ -220,15 +212,13 @@ class GrpcLb : public LoadBalancingPolicy { void ScheduleNextClientLoadReportLocked(); void SendClientLoadReportLocked(); - // EventEngine callbacks - void MaybeSendClientLoadReport(); - void MaybeSendClientLoadReportLocked(); - + static void MaybeSendClientLoadReport(void* arg, grpc_error_handle error); static void ClientLoadReportDone(void* arg, grpc_error_handle error); static void OnInitialRequestSent(void* arg, grpc_error_handle error); static void OnBalancerMessageReceived(void* arg, grpc_error_handle error); static void OnBalancerStatusReceived(void* arg, grpc_error_handle error); + void MaybeSendClientLoadReportLocked(grpc_error_handle error); void ClientLoadReportDoneLocked(grpc_error_handle error); void OnInitialRequestSentLocked(); void OnBalancerMessageReceivedLocked(); @@ -263,11 +253,13 @@ class GrpcLb : public LoadBalancingPolicy { // Created after the first serverlist is received. RefCountedPtr client_stats_; Duration client_stats_report_interval_; - absl::optional client_load_report_handle_; + grpc_timer client_load_report_timer_; + bool client_load_report_timer_callback_pending_ = false; bool last_client_load_report_counters_were_zero_ = false; bool client_load_report_is_due_ = false; - // The closure used for the completion of sending the load report. - grpc_closure client_load_report_done_closure_; + // The closure used for either the load report timer or the callback for + // completion of sending the load report. + grpc_closure client_load_report_closure_; }; class SubchannelWrapper : public DelegatingSubchannel { @@ -831,7 +823,7 @@ GrpcLb::BalancerCallState::BalancerCallState( OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived, this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&client_load_report_done_closure_, ClientLoadReportDone, + GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport, this, grpc_schedule_on_exec_ctx); const Timestamp deadline = grpclb_policy()->lb_call_timeout_ == Duration::Zero() @@ -874,9 +866,8 @@ void GrpcLb::BalancerCallState::Orphan() { // up. Otherwise, we are here because grpclb_policy has to orphan a failed // call, then the following cancellation will be a no-op. grpc_call_cancel_internal(lb_call_); - if (client_load_report_handle_.has_value() && - GetDefaultEventEngine()->Cancel(client_load_report_handle_.value())) { - Unref(DEBUG_LOCATION, "client_load_report cancelled"); + if (client_load_report_timer_callback_pending_) { + grpc_timer_cancel(&client_load_report_timer_); } // Note that the initial ref is hold by lb_on_balancer_status_received_ // instead of the caller of this function. So the corresponding unref happens @@ -960,20 +951,34 @@ void GrpcLb::BalancerCallState::StartQuery() { } void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { - client_load_report_handle_ = GetDefaultEventEngine()->RunAt( - absl::Now() + absl::Milliseconds(client_stats_report_interval_.millis()), - [this] { MaybeSendClientLoadReport(); }); + // InvalidateNow to avoid getting stuck re-initializing this timer + // in a loop while draining the currently-held WorkSerializer. + // Also see https://github.com/grpc/grpc/issues/26079. + ExecCtx::Get()->InvalidateNow(); + const Timestamp next_client_load_report_time = + ExecCtx::Get()->Now() + client_stats_report_interval_; + GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&client_load_report_timer_, next_client_load_report_time, + &client_load_report_closure_); + client_load_report_timer_callback_pending_ = true; } -void GrpcLb::BalancerCallState::MaybeSendClientLoadReport() { - grpclb_policy()->work_serializer()->Run( - [this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION); +void GrpcLb::BalancerCallState::MaybeSendClientLoadReport( + void* arg, grpc_error_handle error) { + BalancerCallState* lb_calld = static_cast(arg); + (void)GRPC_ERROR_REF(error); // ref owned by lambda + lb_calld->grpclb_policy()->work_serializer()->Run( + [lb_calld, error]() { lb_calld->MaybeSendClientLoadReportLocked(error); }, + DEBUG_LOCATION); } -void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() { - client_load_report_handle_.reset(); - if (this != grpclb_policy()->lb_calld_.get()) { +void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked( + grpc_error_handle error) { + client_load_report_timer_callback_pending_ = false; + if (error != GRPC_ERROR_NONE || this != grpclb_policy()->lb_calld_.get()) { Unref(DEBUG_LOCATION, "client_load_report"); + GRPC_ERROR_UNREF(error); return; } // If we've already sent the initial request, then we can go ahead and send @@ -1026,8 +1031,10 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { memset(&op, 0, sizeof(op)); op.op = GRPC_OP_SEND_MESSAGE; op.data.send_message.send_message = send_message_payload_; + GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDone, this, + grpc_schedule_on_exec_ctx); grpc_call_error call_error = grpc_call_start_batch_and_execute( - lb_call_, &op, 1, &client_load_report_done_closure_); + lb_call_, &op, 1, &client_load_report_closure_); if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { gpr_log(GPR_ERROR, "[grpclb %p] lb_calld=%p call_error=%d sending client load report",