From 8ee542daba6a72b071ca2ee93f55b884194d8b45 Mon Sep 17 00:00:00 2001 From: Yijie Ma Date: Thu, 12 Jan 2023 12:31:37 -0800 Subject: [PATCH] [EventEngine] RunAfter migration: passthru_endpoint (#32034) * initial commit * compile * iwyu * pure guess fix to unblock api_fuzzer test * early out hack in passthru_endpoint * this is no use * cancel semantic fix * fix * fix * clarify --- test/core/util/passthru_endpoint.cc | 37 ++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index 31c4ecd2ab6..79651a3919e 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -22,24 +22,30 @@ #include #include +#include #include #include "absl/status/status.h" #include "absl/strings/str_format.h" #include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include #include #include #include #include +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_fwd.h" -#include "src/core/lib/iomgr/timer.h" + +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; typedef struct passthru_endpoint passthru_endpoint; @@ -51,7 +57,7 @@ typedef struct { } pending_op; typedef struct { - grpc_timer timer; + absl::optional timer_handle; uint64_t allowed_write_bytes; uint64_t allowed_read_bytes; std::vector actions; @@ -348,6 +354,8 @@ void grpc_passthru_endpoint_destroy(passthru_endpoint* p) { gpr_free(p); } +static void do_next_sched_channel_action(void* arg, grpc_error_handle error); + static void me_destroy(grpc_endpoint* ep) { passthru_endpoint* p = (reinterpret_cast(ep))->parent; gpr_mu_lock(&p->mu); @@ -357,7 +365,16 @@ static void me_destroy(grpc_endpoint* ep) { grpc_passthru_endpoint_destroy(p); } else { if (p->halves == 0 && p->simulate_channel_actions) { - grpc_timer_cancel(&p->channel_effects->timer); + if (p->channel_effects->timer_handle.has_value()) { + if (GetDefaultEventEngine()->Cancel( + *p->channel_effects->timer_handle)) { + gpr_mu_unlock(&p->mu); + // This will destroy the passthru endpoint so just return after that. + do_next_sched_channel_action(ep, absl::CancelledError()); + return; + } + p->channel_effects->timer_handle.reset(); + } } gpr_mu_unlock(&p->mu); } @@ -483,12 +500,14 @@ static void sched_next_channel_action_locked(half* m) { shutdown_locked(m, err); return; } - grpc_timer_init(&m->parent->channel_effects->timer, - grpc_core::Duration::Milliseconds( - m->parent->channel_effects->actions[0].wait_ms) + - grpc_core::Timestamp::Now(), - GRPC_CLOSURE_CREATE(do_next_sched_channel_action, m, - grpc_schedule_on_exec_ctx)); + m->parent->channel_effects->timer_handle = GetDefaultEventEngine()->RunAfter( + grpc_core::Duration::Milliseconds( + m->parent->channel_effects->actions[0].wait_ms), + [m] { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + do_next_sched_channel_action(m, absl::OkStatus()); + }); } void start_scheduling_grpc_passthru_endpoint_channel_effects(