Allow an alarm to be set again after firing

pull/17933/head
Vijay Pai 6 years ago
parent 96403bc640
commit 45c684f894
  1. 5
      include/grpcpp/alarm_impl.h
  2. 14
      src/cpp/common/alarm.cc
  3. 38
      test/cpp/common/alarm_test.cc

@ -16,8 +16,8 @@
*
*/
/// An Alarm posts the user provided tag to its associated completion queue upon
/// expiry or cancellation.
/// An Alarm posts the user-provided tag to its associated completion queue or
/// invokes the user-provided function on expiry or cancellation.
#ifndef GRPCPP_ALARM_IMPL_H
#define GRPCPP_ALARM_IMPL_H
@ -32,7 +32,6 @@
namespace grpc_impl {
/// A thin wrapper around \a grpc_alarm (see / \a / src/core/surface/alarm.h).
class Alarm : private ::grpc::GrpcLibraryCodegen {
public:
/// Create an unset completion queue alarm

@ -40,12 +40,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
gpr_ref_init(&refs_, 1);
grpc_timer_init_unset(&timer_);
}
~AlarmImpl() {
grpc_core::ExecCtx exec_ctx;
if (cq_ != nullptr) {
GRPC_CQ_INTERNAL_UNREF(cq_, "alarm");
}
}
~AlarmImpl() {}
bool FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
Unref();
@ -63,10 +58,15 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
// queue the op on the completion queue
AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
alarm->Ref();
// Preserve the cq and reset the cq_ so that the alarm
// can be reset when the alarm tag is delivered.
grpc_completion_queue* cq = alarm->cq_;
alarm->cq_ = nullptr;
grpc_cq_end_op(
alarm->cq_, alarm, error,
cq, alarm, error,
[](void* arg, grpc_cq_completion* completion) {},
arg, &alarm->completion_);
GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
},
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),

@ -47,6 +47,44 @@ TEST(AlarmTest, RegularExpiry) {
EXPECT_EQ(junk, output_tag);
}
TEST(AlarmTest, RegularExpiryMultiSet) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
Alarm alarm;
for (int i = 0; i < 3; i++) {
alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk);
void* output_tag;
bool ok;
const CompletionQueue::NextStatus status =
cq.AsyncNext(&output_tag, &ok, grpc_timeout_seconds_to_deadline(10));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_TRUE(ok);
EXPECT_EQ(junk, output_tag);
}
}
TEST(AlarmTest, RegularExpiryMultiSetMultiCQ) {
void* junk = reinterpret_cast<void*>(1618033);
Alarm alarm;
for (int i = 0; i < 3; i++) {
CompletionQueue cq;
alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk);
void* output_tag;
bool ok;
const CompletionQueue::NextStatus status =
cq.AsyncNext(&output_tag, &ok, grpc_timeout_seconds_to_deadline(10));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_TRUE(ok);
EXPECT_EQ(junk, output_tag);
}
}
struct Completion {
bool completed = false;
std::mutex mu;

Loading…
Cancel
Save