[EventEngine] RunAfter migration: passthru_endpoint (#32034)

* initial commit

* compile

* iwyu

* pure guess fix to unblock api_fuzzer test

* early out hack in passthru_endpoint

* this is no use

* cancel semantic fix

* fix

* fix

* clarify
pull/31925/merge
Yijie Ma 2 years ago committed by GitHub
parent b2e52d6e24
commit 8ee542daba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      test/core/util/passthru_endpoint.cc

@ -22,24 +22,30 @@
#include <algorithm>
#include <functional>
#include <memory>
#include <string>
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/timer.h"
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
typedef struct passthru_endpoint passthru_endpoint;
@ -51,7 +57,7 @@ typedef struct {
} pending_op;
typedef struct {
grpc_timer timer;
absl::optional<EventEngine::TaskHandle> timer_handle;
uint64_t allowed_write_bytes;
uint64_t allowed_read_bytes;
std::vector<grpc_passthru_endpoint_channel_action> actions;
@ -348,6 +354,8 @@ void grpc_passthru_endpoint_destroy(passthru_endpoint* p) {
gpr_free(p);
}
static void do_next_sched_channel_action(void* arg, grpc_error_handle error);
static void me_destroy(grpc_endpoint* ep) {
passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
gpr_mu_lock(&p->mu);
@ -357,7 +365,16 @@ static void me_destroy(grpc_endpoint* ep) {
grpc_passthru_endpoint_destroy(p);
} else {
if (p->halves == 0 && p->simulate_channel_actions) {
grpc_timer_cancel(&p->channel_effects->timer);
if (p->channel_effects->timer_handle.has_value()) {
if (GetDefaultEventEngine()->Cancel(
*p->channel_effects->timer_handle)) {
gpr_mu_unlock(&p->mu);
// This will destroy the passthru endpoint so just return after that.
do_next_sched_channel_action(ep, absl::CancelledError());
return;
}
p->channel_effects->timer_handle.reset();
}
}
gpr_mu_unlock(&p->mu);
}
@ -483,12 +500,14 @@ static void sched_next_channel_action_locked(half* m) {
shutdown_locked(m, err);
return;
}
grpc_timer_init(&m->parent->channel_effects->timer,
grpc_core::Duration::Milliseconds(
m->parent->channel_effects->actions[0].wait_ms) +
grpc_core::Timestamp::Now(),
GRPC_CLOSURE_CREATE(do_next_sched_channel_action, m,
grpc_schedule_on_exec_ctx));
m->parent->channel_effects->timer_handle = GetDefaultEventEngine()->RunAfter(
grpc_core::Duration::Milliseconds(
m->parent->channel_effects->actions[0].wait_ms),
[m] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
do_next_sched_channel_action(m, absl::OkStatus());
});
}
void start_scheduling_grpc_passthru_endpoint_channel_effects(

Loading…
Cancel
Save