|
|
|
@ -18,14 +18,8 @@ |
|
|
|
|
#include <grpc/support/port_platform.h> |
|
|
|
|
|
|
|
|
|
#include <functional> |
|
|
|
|
#include <memory> |
|
|
|
|
#include <utility> |
|
|
|
|
|
|
|
|
|
#include "absl/base/thread_annotations.h" |
|
|
|
|
#include "absl/status/status.h" |
|
|
|
|
#include "absl/types/optional.h" |
|
|
|
|
|
|
|
|
|
#include <grpc/event_engine/event_engine.h> |
|
|
|
|
#include <grpc/grpc.h> |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
#include <grpc/support/sync.h> |
|
|
|
@ -34,28 +28,22 @@ |
|
|
|
|
#include <grpcpp/completion_queue.h> |
|
|
|
|
#include <grpcpp/impl/completion_queue_tag.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/event_engine/default_event_engine.h" |
|
|
|
|
#include "src/core/lib/gprpp/sync.h" |
|
|
|
|
#include "src/core/lib/gprpp/time.h" |
|
|
|
|
#include "src/core/lib/iomgr/closure.h" |
|
|
|
|
#include "src/core/lib/iomgr/error.h" |
|
|
|
|
#include "src/core/lib/iomgr/exec_ctx.h" |
|
|
|
|
#include "src/core/lib/iomgr/executor.h" |
|
|
|
|
#include "src/core/lib/iomgr/timer.h" |
|
|
|
|
#include "src/core/lib/surface/completion_queue.h" |
|
|
|
|
|
|
|
|
|
namespace grpc { |
|
|
|
|
|
|
|
|
|
namespace internal { |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
using ::grpc_event_engine::experimental::EventEngine; |
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
class AlarmImpl : public grpc::internal::CompletionQueueTag { |
|
|
|
|
public: |
|
|
|
|
AlarmImpl() |
|
|
|
|
: event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()), |
|
|
|
|
cq_(nullptr), |
|
|
|
|
tag_(nullptr) { |
|
|
|
|
AlarmImpl() : cq_(nullptr), tag_(nullptr) { |
|
|
|
|
gpr_ref_init(&refs_, 1); |
|
|
|
|
grpc_timer_init_unset(&timer_); |
|
|
|
|
} |
|
|
|
|
~AlarmImpl() override {} |
|
|
|
|
bool FinalizeResult(void** tag, bool* /*status*/) override { |
|
|
|
@ -64,46 +52,61 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) { |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
GPR_ASSERT(!cq_timer_handle_.has_value() && |
|
|
|
|
!callback_timer_handle_.has_value()); |
|
|
|
|
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
|
|
|
|
grpc_core::ExecCtx exec_ctx; |
|
|
|
|
GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm"); |
|
|
|
|
cq_ = cq->cq(); |
|
|
|
|
tag_ = tag; |
|
|
|
|
GPR_ASSERT(grpc_cq_begin_op(cq_, this)); |
|
|
|
|
Ref(); |
|
|
|
|
cq_timer_handle_ = event_engine_->RunAfter( |
|
|
|
|
grpc_core::Timestamp::FromTimespecRoundUp(deadline) - |
|
|
|
|
grpc_core::ExecCtx::Get()->Now(), |
|
|
|
|
[this] { OnCQAlarm(absl::OkStatus()); }); |
|
|
|
|
GRPC_CLOSURE_INIT( |
|
|
|
|
&on_alarm_, |
|
|
|
|
[](void* arg, grpc_error_handle error) { |
|
|
|
|
// queue the op on the completion queue
|
|
|
|
|
AlarmImpl* alarm = static_cast<AlarmImpl*>(arg); |
|
|
|
|
alarm->Ref(); |
|
|
|
|
// Preserve the cq and reset the cq_ so that the alarm
|
|
|
|
|
// can be reset when the alarm tag is delivered.
|
|
|
|
|
grpc_completion_queue* cq = alarm->cq_; |
|
|
|
|
alarm->cq_ = nullptr; |
|
|
|
|
grpc_cq_end_op( |
|
|
|
|
cq, alarm, error, |
|
|
|
|
[](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, arg, |
|
|
|
|
&alarm->completion_); |
|
|
|
|
GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); |
|
|
|
|
}, |
|
|
|
|
this, grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_timer_init(&timer_, |
|
|
|
|
grpc_core::Timestamp::FromTimespecRoundUp(deadline), |
|
|
|
|
&on_alarm_); |
|
|
|
|
} |
|
|
|
|
void Set(gpr_timespec deadline, std::function<void(bool)> f) { |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
GPR_ASSERT(!cq_timer_handle_.has_value() && |
|
|
|
|
!callback_timer_handle_.has_value()); |
|
|
|
|
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
|
|
|
|
grpc_core::ExecCtx exec_ctx; |
|
|
|
|
// Don't use any CQ at all. Instead just use the timer to fire the function
|
|
|
|
|
callback_ = std::move(f); |
|
|
|
|
Ref(); |
|
|
|
|
callback_timer_handle_ = event_engine_->RunAfter( |
|
|
|
|
grpc_core::Timestamp::FromTimespecRoundUp(deadline) - |
|
|
|
|
grpc_core::ExecCtx::Get()->Now(), |
|
|
|
|
[this] { OnCallbackAlarm(true); }); |
|
|
|
|
GRPC_CLOSURE_INIT( |
|
|
|
|
&on_alarm_, |
|
|
|
|
[](void* arg, grpc_error_handle error) { |
|
|
|
|
grpc_core::Executor::Run(GRPC_CLOSURE_CREATE( |
|
|
|
|
[](void* arg, grpc_error_handle error) { |
|
|
|
|
AlarmImpl* alarm = |
|
|
|
|
static_cast<AlarmImpl*>(arg); |
|
|
|
|
alarm->callback_(error.ok()); |
|
|
|
|
alarm->Unref(); |
|
|
|
|
}, |
|
|
|
|
arg, nullptr), |
|
|
|
|
error); |
|
|
|
|
}, |
|
|
|
|
this, grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_timer_init(&timer_, |
|
|
|
|
grpc_core::Timestamp::FromTimespecRoundUp(deadline), |
|
|
|
|
&on_alarm_); |
|
|
|
|
} |
|
|
|
|
void Cancel() { |
|
|
|
|
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
|
|
|
|
grpc_core::ExecCtx exec_ctx; |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
if (callback_timer_handle_.has_value() && |
|
|
|
|
event_engine_->Cancel(*callback_timer_handle_)) { |
|
|
|
|
event_engine_->Run([this] { OnCallbackAlarm(/*is_ok=*/false); }); |
|
|
|
|
callback_timer_handle_.reset(); |
|
|
|
|
} else if (cq_timer_handle_.has_value() && |
|
|
|
|
event_engine_->Cancel(*cq_timer_handle_)) { |
|
|
|
|
event_engine_->Run( |
|
|
|
|
[this] { OnCQAlarm(absl::CancelledError("cancelled")); }); |
|
|
|
|
cq_timer_handle_.reset(); |
|
|
|
|
} |
|
|
|
|
grpc_timer_cancel(&timer_); |
|
|
|
|
} |
|
|
|
|
void Destroy() { |
|
|
|
|
Cancel(); |
|
|
|
@ -111,35 +114,6 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
void OnCQAlarm(grpc_error_handle error) { |
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
cq_timer_handle_.reset(); |
|
|
|
|
} |
|
|
|
|
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
|
|
|
|
grpc_core::ExecCtx exec_ctx; |
|
|
|
|
// Preserve the cq and reset the cq_ so that the alarm
|
|
|
|
|
// can be reset when the alarm tag is delivered.
|
|
|
|
|
grpc_completion_queue* cq = cq_; |
|
|
|
|
cq_ = nullptr; |
|
|
|
|
grpc_cq_end_op( |
|
|
|
|
cq, this, error, |
|
|
|
|
[](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, nullptr, |
|
|
|
|
&completion_); |
|
|
|
|
GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void OnCallbackAlarm(bool is_ok) { |
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
callback_timer_handle_.reset(); |
|
|
|
|
} |
|
|
|
|
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
|
|
|
|
grpc_core::ExecCtx exec_ctx; |
|
|
|
|
callback_(is_ok); |
|
|
|
|
Unref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Ref() { gpr_ref(&refs_); } |
|
|
|
|
void Unref() { |
|
|
|
|
if (gpr_unref(&refs_)) { |
|
|
|
@ -147,12 +121,9 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_core::Mutex mu_; |
|
|
|
|
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_; |
|
|
|
|
absl::optional<EventEngine::TaskHandle> cq_timer_handle_ ABSL_GUARDED_BY(mu_); |
|
|
|
|
absl::optional<EventEngine::TaskHandle> callback_timer_handle_ |
|
|
|
|
ABSL_GUARDED_BY(mu_); |
|
|
|
|
grpc_timer timer_; |
|
|
|
|
gpr_refcount refs_; |
|
|
|
|
grpc_closure on_alarm_; |
|
|
|
|
grpc_cq_completion completion_; |
|
|
|
|
// completion queue where events about this alarm will be posted
|
|
|
|
|
grpc_completion_queue* cq_; |
|
|
|
|