diff --git a/BUILD b/BUILD index d6089ed4eaa..0dd4274e90d 100644 --- a/BUILD +++ b/BUILD @@ -3498,12 +3498,13 @@ grpc_cc_library( "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", ], external_deps = [ - "absl/memory", "absl/container:inlined_vector", + "absl/memory", "absl/status", "absl/status:statusor", "absl/strings", "absl/strings:str_format", + "absl/time", "absl/types:optional", "absl/types:variant", "upb_lib", @@ -3514,6 +3515,7 @@ grpc_cc_library( "channel_stack_type", "config", "debug_location", + "default_event_engine_factory_hdrs", "error", "gpr_base", "gpr_codegen", diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 7e6065e1390..17c964d6849 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -53,6 +53,8 @@ #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" +#include + // IWYU pragma: no_include #include @@ -76,6 +78,8 @@ #include "absl/strings/str_join.h" #include "absl/strings/string_view.h" #include "absl/strings/strip.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" #include "absl/types/optional.h" #include "absl/types/variant.h" #include "upb/upb.hpp" @@ -109,6 +113,7 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/event_engine_factory.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/debug_location.h" @@ -155,6 +160,9 @@ const char kGrpcLbAddressAttributeKey[] = "grpclb"; namespace { +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; + constexpr char kGrpclb[] = "grpclb"; class GrpcLbConfig : public LoadBalancingPolicy::Config { @@ -212,13 +220,15 @@ class GrpcLb : public LoadBalancingPolicy { void ScheduleNextClientLoadReportLocked(); void SendClientLoadReportLocked(); - static void MaybeSendClientLoadReport(void* arg, grpc_error_handle error); + // EventEngine callbacks + void MaybeSendClientLoadReport(); + void MaybeSendClientLoadReportLocked(); + static void ClientLoadReportDone(void* arg, grpc_error_handle error); static void OnInitialRequestSent(void* arg, grpc_error_handle error); static void OnBalancerMessageReceived(void* arg, grpc_error_handle error); static void OnBalancerStatusReceived(void* arg, grpc_error_handle error); - void MaybeSendClientLoadReportLocked(grpc_error_handle error); void ClientLoadReportDoneLocked(grpc_error_handle error); void OnInitialRequestSentLocked(); void OnBalancerMessageReceivedLocked(); @@ -253,13 +263,11 @@ class GrpcLb : public LoadBalancingPolicy { // Created after the first serverlist is received. RefCountedPtr client_stats_; Duration client_stats_report_interval_; - grpc_timer client_load_report_timer_; - bool client_load_report_timer_callback_pending_ = false; + absl::optional client_load_report_handle_; bool last_client_load_report_counters_were_zero_ = false; bool client_load_report_is_due_ = false; - // The closure used for either the load report timer or the callback for - // completion of sending the load report. - grpc_closure client_load_report_closure_; + // The closure used for the completion of sending the load report. + grpc_closure client_load_report_done_closure_; }; class SubchannelWrapper : public DelegatingSubchannel { @@ -823,7 +831,7 @@ GrpcLb::BalancerCallState::BalancerCallState( OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived, this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport, + GRPC_CLOSURE_INIT(&client_load_report_done_closure_, ClientLoadReportDone, this, grpc_schedule_on_exec_ctx); const Timestamp deadline = grpclb_policy()->lb_call_timeout_ == Duration::Zero() @@ -866,8 +874,9 @@ void GrpcLb::BalancerCallState::Orphan() { // up. Otherwise, we are here because grpclb_policy has to orphan a failed // call, then the following cancellation will be a no-op. grpc_call_cancel_internal(lb_call_); - if (client_load_report_timer_callback_pending_) { - grpc_timer_cancel(&client_load_report_timer_); + if (client_load_report_handle_.has_value() && + GetDefaultEventEngine()->Cancel(client_load_report_handle_.value())) { + Unref(DEBUG_LOCATION, "client_load_report cancelled"); } // Note that the initial ref is hold by lb_on_balancer_status_received_ // instead of the caller of this function. So the corresponding unref happens @@ -951,34 +960,24 @@ void GrpcLb::BalancerCallState::StartQuery() { } void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { - // InvalidateNow to avoid getting stuck re-initializing this timer - // in a loop while draining the currently-held WorkSerializer. - // Also see https://github.com/grpc/grpc/issues/26079. - ExecCtx::Get()->InvalidateNow(); - const Timestamp next_client_load_report_time = - ExecCtx::Get()->Now() + client_stats_report_interval_; - GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport, - this, grpc_schedule_on_exec_ctx); - grpc_timer_init(&client_load_report_timer_, next_client_load_report_time, - &client_load_report_closure_); - client_load_report_timer_callback_pending_ = true; + client_load_report_handle_ = GetDefaultEventEngine()->RunAt( + absl::Now() + absl::Milliseconds(client_stats_report_interval_.millis()), + [this] { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + MaybeSendClientLoadReport(); + }); } -void GrpcLb::BalancerCallState::MaybeSendClientLoadReport( - void* arg, grpc_error_handle error) { - BalancerCallState* lb_calld = static_cast(arg); - (void)GRPC_ERROR_REF(error); // ref owned by lambda - lb_calld->grpclb_policy()->work_serializer()->Run( - [lb_calld, error]() { lb_calld->MaybeSendClientLoadReportLocked(error); }, - DEBUG_LOCATION); +void GrpcLb::BalancerCallState::MaybeSendClientLoadReport() { + grpclb_policy()->work_serializer()->Run( + [this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION); } -void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked( - grpc_error_handle error) { - client_load_report_timer_callback_pending_ = false; - if (error != GRPC_ERROR_NONE || this != grpclb_policy()->lb_calld_.get()) { +void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() { + client_load_report_handle_.reset(); + if (this != grpclb_policy()->lb_calld_.get()) { Unref(DEBUG_LOCATION, "client_load_report"); - GRPC_ERROR_UNREF(error); return; } // If we've already sent the initial request, then we can go ahead and send @@ -1031,10 +1030,8 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { memset(&op, 0, sizeof(op)); op.op = GRPC_OP_SEND_MESSAGE; op.data.send_message.send_message = send_message_payload_; - GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDone, this, - grpc_schedule_on_exec_ctx); grpc_call_error call_error = grpc_call_start_batch_and_execute( - lb_call_, &op, 1, &client_load_report_closure_); + lb_call_, &op, 1, &client_load_report_done_closure_); if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { gpr_log(GPR_ERROR, "[grpclb %p] lb_calld=%p call_error=%d sending client load report",