@ -18,8 +18,14 @@
# include <grpc/support/port_platform.h>
# include <functional>
# include <memory>
# include <utility>
# include "absl/base/thread_annotations.h"
# include "absl/status/status.h"
# include "absl/types/optional.h"
# include <grpc/event_engine/event_engine.h>
# include <grpc/grpc.h>
# include <grpc/support/log.h>
# include <grpc/support/sync.h>
@ -28,22 +34,28 @@
# include <grpcpp/completion_queue.h>
# include <grpcpp/impl/completion_queue_tag.h>
# include "src/core/lib/event_engine/default_event_engine.h"
# include "src/core/lib/gprpp/sync.h"
# include "src/core/lib/gprpp/time.h"
# include "src/core/lib/iomgr/closure.h"
# include "src/core/lib/iomgr/error.h"
# include "src/core/lib/iomgr/exec_ctx.h"
# include "src/core/lib/iomgr/executor.h"
# include "src/core/lib/iomgr/timer.h"
# include "src/core/lib/surface/completion_queue.h"
namespace grpc {
namespace internal {
namespace {
using : : grpc_event_engine : : experimental : : EventEngine ;
} // namespace
class AlarmImpl : public grpc : : internal : : CompletionQueueTag {
public :
AlarmImpl ( ) : cq_ ( nullptr ) , tag_ ( nullptr ) {
AlarmImpl ( )
: event_engine_ ( grpc_event_engine : : experimental : : GetDefaultEventEngine ( ) ) ,
cq_ ( nullptr ) ,
tag_ ( nullptr ) {
gpr_ref_init ( & refs_ , 1 ) ;
grpc_timer_init_unset ( & timer_ ) ;
}
~ AlarmImpl ( ) override { }
bool FinalizeResult ( void * * tag , bool * /*status*/ ) override {
@ -52,61 +64,46 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag {
return true ;
}
void Set ( grpc : : CompletionQueue * cq , gpr_timespec deadline , void * tag ) {
grpc_core : : ApplicationCallbackExecCtx callback_exec_ctx ;
grpc_core : : MutexLock lock ( & mu_ ) ;
GPR_ASSERT ( ! cq_timer_handle_ . has_value ( ) & &
! callback_timer_handle_ . has_value ( ) ) ;
grpc_core : : ExecCtx exec_ctx ;
GRPC_CQ_INTERNAL_REF ( cq - > cq ( ) , " alarm " ) ;
cq_ = cq - > cq ( ) ;
tag_ = tag ;
GPR_ASSERT ( grpc_cq_begin_op ( cq_ , this ) ) ;
GRPC_CLOSURE_INIT (
& on_alarm_ ,
[ ] ( void * arg , grpc_error_handle error ) {
// queue the op on the completion queue
AlarmImpl * alarm = static_cast < AlarmImpl * > ( arg ) ;
alarm - > Ref ( ) ;
// Preserve the cq and reset the cq_ so that the alarm
// can be reset when the alarm tag is delivered.
grpc_completion_queue * cq = alarm - > cq_ ;
alarm - > cq_ = nullptr ;
grpc_cq_end_op (
cq , alarm , error ,
[ ] ( void * /*arg*/ , grpc_cq_completion * /*completion*/ ) { } , arg ,
& alarm - > completion_ ) ;
GRPC_CQ_INTERNAL_UNREF ( cq , " alarm " ) ;
} ,
this , grpc_schedule_on_exec_ctx ) ;
grpc_timer_init ( & timer_ ,
grpc_core : : Timestamp : : FromTimespecRoundUp ( deadline ) ,
& on_alarm_ ) ;
Ref ( ) ;
cq_timer_handle_ = event_engine_ - > RunAfter (
grpc_core : : Timestamp : : FromTimespecRoundUp ( deadline ) -
grpc_core : : ExecCtx : : Get ( ) - > Now ( ) ,
[ this ] { OnCQAlarm ( absl : : OkStatus ( ) ) ; } ) ;
}
void Set ( gpr_timespec deadline , std : : function < void ( bool ) > f ) {
grpc_core : : ApplicationCallbackExecCtx callback_exec_ctx ;
grpc_core : : MutexLock lock ( & mu_ ) ;
GPR_ASSERT ( ! cq_timer_handle_ . has_value ( ) & &
! callback_timer_handle_ . has_value ( ) ) ;
grpc_core : : ExecCtx exec_ctx ;
// Don't use any CQ at all. Instead just use the timer to fire the function
callback_ = std : : move ( f ) ;
Ref ( ) ;
GRPC_CLOSURE_INIT (
& on_alarm_ ,
[ ] ( void * arg , grpc_error_handle error ) {
grpc_core : : Executor : : Run ( GRPC_CLOSURE_CREATE (
[ ] ( void * arg , grpc_error_handle error ) {
AlarmImpl * alarm =
static_cast < AlarmImpl * > ( arg ) ;
alarm - > callback_ ( error . ok ( ) ) ;
alarm - > Unref ( ) ;
} ,
arg , nullptr ) ,
error ) ;
} ,
this , grpc_schedule_on_exec_ctx ) ;
grpc_timer_init ( & timer_ ,
grpc_core : : Timestamp : : FromTimespecRoundUp ( deadline ) ,
& on_alarm_ ) ;
callback_timer_handle_ = event_engine_ - > RunAfter (
grpc_core : : Timestamp : : FromTimespecRoundUp ( deadline ) -
grpc_core : : ExecCtx : : Get ( ) - > Now ( ) ,
[ this ] { OnCallbackAlarm ( true ) ; } ) ;
}
void Cancel ( ) {
grpc_core : : ApplicationCallbackExecCtx callback_exec_ctx ;
grpc_core : : ExecCtx exec_ctx ;
grpc_timer_cancel ( & timer_ ) ;
grpc_core : : MutexLock lock ( & mu_ ) ;
if ( callback_timer_handle_ . has_value ( ) & &
event_engine_ - > Cancel ( * callback_timer_handle_ ) ) {
event_engine_ - > Run ( [ this ] { OnCallbackAlarm ( /*is_ok=*/ false ) ; } ) ;
callback_timer_handle_ . reset ( ) ;
} else if ( cq_timer_handle_ . has_value ( ) & &
event_engine_ - > Cancel ( * cq_timer_handle_ ) ) {
event_engine_ - > Run (
[ this ] { OnCQAlarm ( absl : : CancelledError ( " cancelled " ) ) ; } ) ;
cq_timer_handle_ . reset ( ) ;
}
}
void Destroy ( ) {
Cancel ( ) ;
@ -114,6 +111,35 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag {
}
private :
void OnCQAlarm ( grpc_error_handle error ) {
{
grpc_core : : MutexLock lock ( & mu_ ) ;
cq_timer_handle_ . reset ( ) ;
}
grpc_core : : ApplicationCallbackExecCtx callback_exec_ctx ;
grpc_core : : ExecCtx exec_ctx ;
// Preserve the cq and reset the cq_ so that the alarm
// can be reset when the alarm tag is delivered.
grpc_completion_queue * cq = cq_ ;
cq_ = nullptr ;
grpc_cq_end_op (
cq , this , error ,
[ ] ( void * /*arg*/ , grpc_cq_completion * /*completion*/ ) { } , nullptr ,
& completion_ ) ;
GRPC_CQ_INTERNAL_UNREF ( cq , " alarm " ) ;
}
void OnCallbackAlarm ( bool is_ok ) {
{
grpc_core : : MutexLock lock ( & mu_ ) ;
callback_timer_handle_ . reset ( ) ;
}
grpc_core : : ApplicationCallbackExecCtx callback_exec_ctx ;
grpc_core : : ExecCtx exec_ctx ;
callback_ ( is_ok ) ;
Unref ( ) ;
}
void Ref ( ) { gpr_ref ( & refs_ ) ; }
void Unref ( ) {
if ( gpr_unref ( & refs_ ) ) {
@ -121,9 +147,12 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag {
}
}
grpc_timer timer_ ;
grpc_core : : Mutex mu_ ;
std : : shared_ptr < grpc_event_engine : : experimental : : EventEngine > event_engine_ ;
absl : : optional < EventEngine : : TaskHandle > cq_timer_handle_ ABSL_GUARDED_BY ( mu_ ) ;
absl : : optional < EventEngine : : TaskHandle > callback_timer_handle_
ABSL_GUARDED_BY ( mu_ ) ;
gpr_refcount refs_ ;
grpc_closure on_alarm_ ;
grpc_cq_completion completion_ ;
// completion queue where events about this alarm will be posted
grpc_completion_queue * cq_ ;