Revert "Revert "EventEngine::RunAt - grpclb load report timer"" (#29831)

* Revert "Revert "EventEngine::RunAt - grpclb load report timer (#29719)" (#29821)"

This reverts commit 18307587b4.

* add an exec-ctx to the callback

* fix sanity checks
pull/29845/head
Vignesh Babu 3 years ago committed by GitHub
parent 6d33399d1e
commit b073407d84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      BUILD
  2. 69
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc

@ -3498,12 +3498,13 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
],
external_deps = [
"absl/memory",
"absl/container:inlined_vector",
"absl/memory",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/strings:str_format",
"absl/time",
"absl/types:optional",
"absl/types:variant",
"upb_lib",
@ -3514,6 +3515,7 @@ grpc_cc_library(
"channel_stack_type",
"config",
"debug_location",
"default_event_engine_factory_hdrs",
"error",
"gpr_base",
"gpr_codegen",

@ -53,6 +53,8 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
#include <grpc/event_engine/event_engine.h>
// IWYU pragma: no_include <sys/socket.h>
#include <inttypes.h>
@ -76,6 +78,8 @@
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "upb/upb.hpp"
@ -109,6 +113,7 @@
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
@ -155,6 +160,9 @@ const char kGrpcLbAddressAttributeKey[] = "grpclb";
namespace {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
constexpr char kGrpclb[] = "grpclb";
class GrpcLbConfig : public LoadBalancingPolicy::Config {
@ -212,13 +220,15 @@ class GrpcLb : public LoadBalancingPolicy {
void ScheduleNextClientLoadReportLocked();
void SendClientLoadReportLocked();
static void MaybeSendClientLoadReport(void* arg, grpc_error_handle error);
// EventEngine callbacks
void MaybeSendClientLoadReport();
void MaybeSendClientLoadReportLocked();
static void ClientLoadReportDone(void* arg, grpc_error_handle error);
static void OnInitialRequestSent(void* arg, grpc_error_handle error);
static void OnBalancerMessageReceived(void* arg, grpc_error_handle error);
static void OnBalancerStatusReceived(void* arg, grpc_error_handle error);
void MaybeSendClientLoadReportLocked(grpc_error_handle error);
void ClientLoadReportDoneLocked(grpc_error_handle error);
void OnInitialRequestSentLocked();
void OnBalancerMessageReceivedLocked();
@ -253,13 +263,11 @@ class GrpcLb : public LoadBalancingPolicy {
// Created after the first serverlist is received.
RefCountedPtr<GrpcLbClientStats> client_stats_;
Duration client_stats_report_interval_;
grpc_timer client_load_report_timer_;
bool client_load_report_timer_callback_pending_ = false;
absl::optional<EventEngine::TaskHandle> client_load_report_handle_;
bool last_client_load_report_counters_were_zero_ = false;
bool client_load_report_is_due_ = false;
// The closure used for either the load report timer or the callback for
// completion of sending the load report.
grpc_closure client_load_report_closure_;
// The closure used for the completion of sending the load report.
grpc_closure client_load_report_done_closure_;
};
class SubchannelWrapper : public DelegatingSubchannel {
@ -823,7 +831,7 @@ GrpcLb::BalancerCallState::BalancerCallState(
OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived,
this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport,
GRPC_CLOSURE_INIT(&client_load_report_done_closure_, ClientLoadReportDone,
this, grpc_schedule_on_exec_ctx);
const Timestamp deadline =
grpclb_policy()->lb_call_timeout_ == Duration::Zero()
@ -866,8 +874,9 @@ void GrpcLb::BalancerCallState::Orphan() {
// up. Otherwise, we are here because grpclb_policy has to orphan a failed
// call, then the following cancellation will be a no-op.
grpc_call_cancel_internal(lb_call_);
if (client_load_report_timer_callback_pending_) {
grpc_timer_cancel(&client_load_report_timer_);
if (client_load_report_handle_.has_value() &&
GetDefaultEventEngine()->Cancel(client_load_report_handle_.value())) {
Unref(DEBUG_LOCATION, "client_load_report cancelled");
}
// Note that the initial ref is hold by lb_on_balancer_status_received_
// instead of the caller of this function. So the corresponding unref happens
@ -951,34 +960,24 @@ void GrpcLb::BalancerCallState::StartQuery() {
}
void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
// InvalidateNow to avoid getting stuck re-initializing this timer
// in a loop while draining the currently-held WorkSerializer.
// Also see https://github.com/grpc/grpc/issues/26079.
ExecCtx::Get()->InvalidateNow();
const Timestamp next_client_load_report_time =
ExecCtx::Get()->Now() + client_stats_report_interval_;
GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport,
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
&client_load_report_closure_);
client_load_report_timer_callback_pending_ = true;
client_load_report_handle_ = GetDefaultEventEngine()->RunAt(
absl::Now() + absl::Milliseconds(client_stats_report_interval_.millis()),
[this] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
MaybeSendClientLoadReport();
});
}
void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(
void* arg, grpc_error_handle error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
lb_calld->grpclb_policy()->work_serializer()->Run(
[lb_calld, error]() { lb_calld->MaybeSendClientLoadReportLocked(error); },
DEBUG_LOCATION);
void GrpcLb::BalancerCallState::MaybeSendClientLoadReport() {
grpclb_policy()->work_serializer()->Run(
[this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION);
}
void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
grpc_error_handle error) {
client_load_report_timer_callback_pending_ = false;
if (error != GRPC_ERROR_NONE || this != grpclb_policy()->lb_calld_.get()) {
void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() {
client_load_report_handle_.reset();
if (this != grpclb_policy()->lb_calld_.get()) {
Unref(DEBUG_LOCATION, "client_load_report");
GRPC_ERROR_UNREF(error);
return;
}
// If we've already sent the initial request, then we can go ahead and send
@ -1031,10 +1030,8 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = send_message_payload_;
GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDone, this,
grpc_schedule_on_exec_ctx);
grpc_call_error call_error = grpc_call_start_batch_and_execute(
lb_call_, &op, 1, &client_load_report_closure_);
lb_call_, &op, 1, &client_load_report_done_closure_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
gpr_log(GPR_ERROR,
"[grpclb %p] lb_calld=%p call_error=%d sending client load report",

Loading…
Cancel
Save