diff --git a/include/grpcpp/alarm_impl.h b/include/grpcpp/alarm_impl.h index 7844e7c8866..543dcd82a4c 100644 --- a/include/grpcpp/alarm_impl.h +++ b/include/grpcpp/alarm_impl.h @@ -16,8 +16,8 @@ * */ -/// An Alarm posts the user provided tag to its associated completion queue upon -/// expiry or cancellation. +/// An Alarm posts the user-provided tag to its associated completion queue or +/// invokes the user-provided function on expiry or cancellation. #ifndef GRPCPP_ALARM_IMPL_H #define GRPCPP_ALARM_IMPL_H @@ -32,7 +32,6 @@ namespace grpc_impl { -/// A thin wrapper around \a grpc_alarm (see / \a / src/core/surface/alarm.h). class Alarm : private ::grpc::GrpcLibraryCodegen { public: /// Create an unset completion queue alarm diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index 6bfe26f04c4..dbec80cde4f 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -40,12 +40,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { gpr_ref_init(&refs_, 1); grpc_timer_init_unset(&timer_); } - ~AlarmImpl() { - grpc_core::ExecCtx exec_ctx; - if (cq_ != nullptr) { - GRPC_CQ_INTERNAL_UNREF(cq_, "alarm"); - } - } + ~AlarmImpl() {} bool FinalizeResult(void** tag, bool* status) override { *tag = tag_; Unref(); @@ -63,10 +58,15 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { // queue the op on the completion queue AlarmImpl* alarm = static_cast(arg); alarm->Ref(); + // Preserve the cq and reset the cq_ so that the alarm + // can be reset when the alarm tag is delivered. + grpc_completion_queue* cq = alarm->cq_; + alarm->cq_ = nullptr; grpc_cq_end_op( - alarm->cq_, alarm, error, + cq, alarm, error, [](void* arg, grpc_cq_completion* completion) {}, arg, &alarm->completion_); + GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); }, this, grpc_schedule_on_exec_ctx); grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), diff --git a/test/cpp/common/alarm_test.cc b/test/cpp/common/alarm_test.cc index 802cdc209a0..4d410a5d460 100644 --- a/test/cpp/common/alarm_test.cc +++ b/test/cpp/common/alarm_test.cc @@ -47,6 +47,44 @@ TEST(AlarmTest, RegularExpiry) { EXPECT_EQ(junk, output_tag); } +TEST(AlarmTest, RegularExpiryMultiSet) { + CompletionQueue cq; + void* junk = reinterpret_cast(1618033); + Alarm alarm; + + for (int i = 0; i < 3; i++) { + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk); + + void* output_tag; + bool ok; + const CompletionQueue::NextStatus status = + cq.AsyncNext(&output_tag, &ok, grpc_timeout_seconds_to_deadline(10)); + + EXPECT_EQ(status, CompletionQueue::GOT_EVENT); + EXPECT_TRUE(ok); + EXPECT_EQ(junk, output_tag); + } +} + +TEST(AlarmTest, RegularExpiryMultiSetMultiCQ) { + void* junk = reinterpret_cast(1618033); + Alarm alarm; + + for (int i = 0; i < 3; i++) { + CompletionQueue cq; + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk); + + void* output_tag; + bool ok; + const CompletionQueue::NextStatus status = + cq.AsyncNext(&output_tag, &ok, grpc_timeout_seconds_to_deadline(10)); + + EXPECT_EQ(status, CompletionQueue::GOT_EVENT); + EXPECT_TRUE(ok); + EXPECT_EQ(junk, output_tag); + } +} + struct Completion { bool completed = false; std::mutex mu;