Revert "EventEngine::RunAt - grpclb load report timer (#29719)" (#29821)

This reverts commit cdd49fc541.
pull/29824/head
Vignesh Babu 3 years ago committed by GitHub
parent cdd49fc541
commit 18307587b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      BUILD
  2. 65
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc

@ -3497,13 +3497,12 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
], ],
external_deps = [ external_deps = [
"absl/container:inlined_vector",
"absl/memory", "absl/memory",
"absl/container:inlined_vector",
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",
"absl/strings", "absl/strings",
"absl/strings:str_format", "absl/strings:str_format",
"absl/time",
"absl/types:optional", "absl/types:optional",
"absl/types:variant", "absl/types:variant",
"upb_lib", "upb_lib",
@ -3514,7 +3513,6 @@ grpc_cc_library(
"channel_stack_type", "channel_stack_type",
"config", "config",
"debug_location", "debug_location",
"default_event_engine_factory_hdrs",
"error", "error",
"gpr_base", "gpr_base",
"gpr_codegen", "gpr_codegen",

@ -53,8 +53,6 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
#include <grpc/event_engine/event_engine.h>
// IWYU pragma: no_include <sys/socket.h> // IWYU pragma: no_include <sys/socket.h>
#include <inttypes.h> #include <inttypes.h>
@ -78,8 +76,6 @@
#include "absl/strings/str_join.h" #include "absl/strings/str_join.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/strings/strip.h" #include "absl/strings/strip.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "absl/types/variant.h" #include "absl/types/variant.h"
#include "upb/upb.hpp" #include "upb/upb.hpp"
@ -113,7 +109,6 @@
#include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
@ -160,9 +155,6 @@ const char kGrpcLbAddressAttributeKey[] = "grpclb";
namespace { namespace {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
constexpr char kGrpclb[] = "grpclb"; constexpr char kGrpclb[] = "grpclb";
class GrpcLbConfig : public LoadBalancingPolicy::Config { class GrpcLbConfig : public LoadBalancingPolicy::Config {
@ -220,15 +212,13 @@ class GrpcLb : public LoadBalancingPolicy {
void ScheduleNextClientLoadReportLocked(); void ScheduleNextClientLoadReportLocked();
void SendClientLoadReportLocked(); void SendClientLoadReportLocked();
// EventEngine callbacks static void MaybeSendClientLoadReport(void* arg, grpc_error_handle error);
void MaybeSendClientLoadReport();
void MaybeSendClientLoadReportLocked();
static void ClientLoadReportDone(void* arg, grpc_error_handle error); static void ClientLoadReportDone(void* arg, grpc_error_handle error);
static void OnInitialRequestSent(void* arg, grpc_error_handle error); static void OnInitialRequestSent(void* arg, grpc_error_handle error);
static void OnBalancerMessageReceived(void* arg, grpc_error_handle error); static void OnBalancerMessageReceived(void* arg, grpc_error_handle error);
static void OnBalancerStatusReceived(void* arg, grpc_error_handle error); static void OnBalancerStatusReceived(void* arg, grpc_error_handle error);
void MaybeSendClientLoadReportLocked(grpc_error_handle error);
void ClientLoadReportDoneLocked(grpc_error_handle error); void ClientLoadReportDoneLocked(grpc_error_handle error);
void OnInitialRequestSentLocked(); void OnInitialRequestSentLocked();
void OnBalancerMessageReceivedLocked(); void OnBalancerMessageReceivedLocked();
@ -263,11 +253,13 @@ class GrpcLb : public LoadBalancingPolicy {
// Created after the first serverlist is received. // Created after the first serverlist is received.
RefCountedPtr<GrpcLbClientStats> client_stats_; RefCountedPtr<GrpcLbClientStats> client_stats_;
Duration client_stats_report_interval_; Duration client_stats_report_interval_;
absl::optional<EventEngine::TaskHandle> client_load_report_handle_; grpc_timer client_load_report_timer_;
bool client_load_report_timer_callback_pending_ = false;
bool last_client_load_report_counters_were_zero_ = false; bool last_client_load_report_counters_were_zero_ = false;
bool client_load_report_is_due_ = false; bool client_load_report_is_due_ = false;
// The closure used for the completion of sending the load report. // The closure used for either the load report timer or the callback for
grpc_closure client_load_report_done_closure_; // completion of sending the load report.
grpc_closure client_load_report_closure_;
}; };
class SubchannelWrapper : public DelegatingSubchannel { class SubchannelWrapper : public DelegatingSubchannel {
@ -831,7 +823,7 @@ GrpcLb::BalancerCallState::BalancerCallState(
OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx); OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived, GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived,
this, grpc_schedule_on_exec_ctx); this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&client_load_report_done_closure_, ClientLoadReportDone, GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport,
this, grpc_schedule_on_exec_ctx); this, grpc_schedule_on_exec_ctx);
const Timestamp deadline = const Timestamp deadline =
grpclb_policy()->lb_call_timeout_ == Duration::Zero() grpclb_policy()->lb_call_timeout_ == Duration::Zero()
@ -874,9 +866,8 @@ void GrpcLb::BalancerCallState::Orphan() {
// up. Otherwise, we are here because grpclb_policy has to orphan a failed // up. Otherwise, we are here because grpclb_policy has to orphan a failed
// call, then the following cancellation will be a no-op. // call, then the following cancellation will be a no-op.
grpc_call_cancel_internal(lb_call_); grpc_call_cancel_internal(lb_call_);
if (client_load_report_handle_.has_value() && if (client_load_report_timer_callback_pending_) {
GetDefaultEventEngine()->Cancel(client_load_report_handle_.value())) { grpc_timer_cancel(&client_load_report_timer_);
Unref(DEBUG_LOCATION, "client_load_report cancelled");
} }
// Note that the initial ref is hold by lb_on_balancer_status_received_ // Note that the initial ref is hold by lb_on_balancer_status_received_
// instead of the caller of this function. So the corresponding unref happens // instead of the caller of this function. So the corresponding unref happens
@ -960,20 +951,34 @@ void GrpcLb::BalancerCallState::StartQuery() {
} }
void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
client_load_report_handle_ = GetDefaultEventEngine()->RunAt( // InvalidateNow to avoid getting stuck re-initializing this timer
absl::Now() + absl::Milliseconds(client_stats_report_interval_.millis()), // in a loop while draining the currently-held WorkSerializer.
[this] { MaybeSendClientLoadReport(); }); // Also see https://github.com/grpc/grpc/issues/26079.
ExecCtx::Get()->InvalidateNow();
const Timestamp next_client_load_report_time =
ExecCtx::Get()->Now() + client_stats_report_interval_;
GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport,
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
&client_load_report_closure_);
client_load_report_timer_callback_pending_ = true;
} }
void GrpcLb::BalancerCallState::MaybeSendClientLoadReport() { void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(
grpclb_policy()->work_serializer()->Run( void* arg, grpc_error_handle error) {
[this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION); BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
lb_calld->grpclb_policy()->work_serializer()->Run(
[lb_calld, error]() { lb_calld->MaybeSendClientLoadReportLocked(error); },
DEBUG_LOCATION);
} }
void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() { void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
client_load_report_handle_.reset(); grpc_error_handle error) {
if (this != grpclb_policy()->lb_calld_.get()) { client_load_report_timer_callback_pending_ = false;
if (error != GRPC_ERROR_NONE || this != grpclb_policy()->lb_calld_.get()) {
Unref(DEBUG_LOCATION, "client_load_report"); Unref(DEBUG_LOCATION, "client_load_report");
GRPC_ERROR_UNREF(error);
return; return;
} }
// If we've already sent the initial request, then we can go ahead and send // If we've already sent the initial request, then we can go ahead and send
@ -1026,8 +1031,10 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
memset(&op, 0, sizeof(op)); memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE; op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = send_message_payload_; op.data.send_message.send_message = send_message_payload_;
GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDone, this,
grpc_schedule_on_exec_ctx);
grpc_call_error call_error = grpc_call_start_batch_and_execute( grpc_call_error call_error = grpc_call_start_batch_and_execute(
lb_call_, &op, 1, &client_load_report_done_closure_); lb_call_, &op, 1, &client_load_report_closure_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"[grpclb %p] lb_calld=%p call_error=%d sending client load report", "[grpclb %p] lb_calld=%p call_error=%d sending client load report",

Loading…
Cancel
Save