From db01bf793aeab78b8b8d85686977240afb56a536 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 11 Sep 2018 17:01:19 -0700 Subject: [PATCH 1/2] Add callback-based alarms --- include/grpcpp/alarm.h | 24 ++++++++ src/cpp/common/alarm.cc | 47 +++++++++++---- test/cpp/common/alarm_test.cc | 106 +++++++++++++++++++++++++++++++++- 3 files changed, 165 insertions(+), 12 deletions(-) diff --git a/include/grpcpp/alarm.h b/include/grpcpp/alarm.h index f484610a6ed..f9008c327e4 100644 --- a/include/grpcpp/alarm.h +++ b/include/grpcpp/alarm.h @@ -21,6 +21,8 @@ #ifndef GRPCPP_ALARM_H #define GRPCPP_ALARM_H +#include + #include #include #include @@ -76,8 +78,30 @@ class Alarm : private GrpcLibraryCodegen { /// has already fired has no effect. void Cancel(); + /// NOTE: class experimental_type is not part of the public API of this class + /// TODO(vjpai): Move these contents to the public API of Alarm when + /// they are no longer experimental + class experimental_type { + public: + explicit experimental_type(Alarm* alarm) : alarm_(alarm) {} + + template + void Set(const T& deadline, std::function f) { + alarm_->SetInternal(TimePoint(deadline).raw_time(), std::move(f)); + } + + private: + Alarm* alarm_; + }; + + /// NOTE: The function experimental() is not stable public API. It is a view + /// to the experimental components of this class. It may be changed or removed + /// at any time. + experimental_type experimental() { return experimental_type(this); } + private: void SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag); + void SetInternal(gpr_timespec deadline, std::function f); internal::CompletionQueueTag* alarm_; }; diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index 15a373d8a54..5819a4210bd 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -39,17 +39,6 @@ class AlarmImpl : public CompletionQueueTag { AlarmImpl() : cq_(nullptr), tag_(nullptr) { gpr_ref_init(&refs_, 1); grpc_timer_init_unset(&timer_); - GRPC_CLOSURE_INIT(&on_alarm_, - [](void* arg, grpc_error* error) { - // queue the op on the completion queue - AlarmImpl* alarm = static_cast(arg); - alarm->Ref(); - grpc_cq_end_op( - alarm->cq_, alarm, error, - [](void* arg, grpc_cq_completion* completion) {}, - arg, &alarm->completion_); - }, - this, grpc_schedule_on_exec_ctx); } ~AlarmImpl() { grpc_core::ExecCtx exec_ctx; @@ -68,6 +57,32 @@ class AlarmImpl : public CompletionQueueTag { cq_ = cq->cq(); tag_ = tag; GPR_ASSERT(grpc_cq_begin_op(cq_, this)); + GRPC_CLOSURE_INIT(&on_alarm_, + [](void* arg, grpc_error* error) { + // queue the op on the completion queue + AlarmImpl* alarm = static_cast(arg); + alarm->Ref(); + grpc_cq_end_op( + alarm->cq_, alarm, error, + [](void* arg, grpc_cq_completion* completion) {}, + arg, &alarm->completion_); + }, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), + &on_alarm_); + } + void Set(gpr_timespec deadline, std::function f) { + grpc_core::ExecCtx exec_ctx; + // Don't use any CQ at all. Instead just use the timer to fire the function + callback_ = std::move(f); + Ref(); + GRPC_CLOSURE_INIT(&on_alarm_, + [](void* arg, grpc_error* error) { + AlarmImpl* alarm = static_cast(arg); + alarm->callback_(error == GRPC_ERROR_NONE); + alarm->Unref(); + }, + this, grpc_schedule_on_exec_ctx); grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), &on_alarm_); } @@ -95,6 +110,7 @@ class AlarmImpl : public CompletionQueueTag { // completion queue where events about this alarm will be posted grpc_completion_queue* cq_; void* tag_; + std::function callback_; }; } // namespace internal @@ -113,6 +129,15 @@ void Alarm::SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag) { static_cast(alarm_)->Set(cq, deadline, tag); } +void Alarm::SetInternal(gpr_timespec deadline, std::function f) { + // Note that we know that alarm_ is actually an internal::AlarmImpl + // but we declared it as the base pointer to avoid a forward declaration + // or exposing core data structures in the C++ public headers. + // Thus it is safe to use a static_cast to the subclass here, and the + // C++ style guide allows us to do so in this case + static_cast(alarm_)->Set(deadline, std::move(f)); +} + Alarm::~Alarm() { if (alarm_ != nullptr) { static_cast(alarm_)->Destroy(); diff --git a/test/cpp/common/alarm_test.cc b/test/cpp/common/alarm_test.cc index 57d958349ec..e909d03658c 100644 --- a/test/cpp/common/alarm_test.cc +++ b/test/cpp/common/alarm_test.cc @@ -16,9 +16,13 @@ * */ +#include +#include +#include +#include + #include #include -#include #include @@ -43,6 +47,66 @@ TEST(AlarmTest, RegularExpiry) { EXPECT_EQ(junk, output_tag); } +struct Completion { + bool completed = false; + std::mutex mu; + std::condition_variable cv; +}; + +TEST(AlarmTest, CallbackRegularExpiry) { + Alarm alarm; + + auto c = std::make_shared(); + alarm.experimental().Set( + std::chrono::system_clock::now() + std::chrono::seconds(1), [c](bool ok) { + EXPECT_TRUE(ok); + std::lock_guard l(c->mu); + c->completed = true; + c->cv.notify_one(); + }); + + std::unique_lock l(c->mu); + EXPECT_TRUE(c->cv.wait_until( + l, std::chrono::system_clock::now() + std::chrono::seconds(10), + [c] { return c->completed; })); +} + +TEST(AlarmTest, CallbackZeroExpiry) { + Alarm alarm; + + auto c = std::make_shared(); + alarm.experimental().Set(grpc_timeout_seconds_to_deadline(0), [c](bool ok) { + EXPECT_TRUE(ok); + std::lock_guard l(c->mu); + c->completed = true; + c->cv.notify_one(); + }); + + std::unique_lock l(c->mu); + EXPECT_TRUE(c->cv.wait_until( + l, std::chrono::system_clock::now() + std::chrono::seconds(10), + [c] { return c->completed; })); +} + +TEST(AlarmTest, CallbackNegativeExpiry) { + Alarm alarm; + + auto c = std::make_shared(); + alarm.experimental().Set( + std::chrono::system_clock::now() + std::chrono::seconds(-1), + [c](bool ok) { + EXPECT_TRUE(ok); + std::lock_guard l(c->mu); + c->completed = true; + c->cv.notify_one(); + }); + + std::unique_lock l(c->mu); + EXPECT_TRUE(c->cv.wait_until( + l, std::chrono::system_clock::now() + std::chrono::seconds(10), + [c] { return c->completed; })); +} + TEST(AlarmTest, MultithreadedRegularExpiry) { CompletionQueue cq; void* junk = reinterpret_cast(1618033); @@ -182,6 +246,26 @@ TEST(AlarmTest, Cancellation) { EXPECT_EQ(junk, output_tag); } +TEST(AlarmTest, CallbackCancellation) { + Alarm alarm; + + auto c = std::make_shared(); + alarm.experimental().Set( + std::chrono::system_clock::now() + std::chrono::seconds(10), + [c](bool ok) { + EXPECT_FALSE(ok); + std::lock_guard l(c->mu); + c->completed = true; + c->cv.notify_one(); + }); + alarm.Cancel(); + + std::unique_lock l(c->mu); + EXPECT_TRUE(c->cv.wait_until( + l, std::chrono::system_clock::now() + std::chrono::seconds(1), + [c] { return c->completed; })); +} + TEST(AlarmTest, SetDestruction) { CompletionQueue cq; void* junk = reinterpret_cast(1618033); @@ -200,6 +284,26 @@ TEST(AlarmTest, SetDestruction) { EXPECT_EQ(junk, output_tag); } +TEST(AlarmTest, CallbackSetDestruction) { + auto c = std::make_shared(); + { + Alarm alarm; + alarm.experimental().Set( + std::chrono::system_clock::now() + std::chrono::seconds(10), + [c](bool ok) { + EXPECT_FALSE(ok); + std::lock_guard l(c->mu); + c->completed = true; + c->cv.notify_one(); + }); + } + + std::unique_lock l(c->mu); + EXPECT_TRUE(c->cv.wait_until( + l, std::chrono::system_clock::now() + std::chrono::seconds(1), + [c] { return c->completed; })); +} + TEST(AlarmTest, UnsetDestruction) { CompletionQueue cq; Alarm alarm; From 1359543b47c0ecff8ecd50ae4d3a2989e16a8a94 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 19 Sep 2018 09:39:21 -0700 Subject: [PATCH 2/2] Add a comment to address reviewer feedback --- include/grpcpp/alarm.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/include/grpcpp/alarm.h b/include/grpcpp/alarm.h index f9008c327e4..365feb4eb95 100644 --- a/include/grpcpp/alarm.h +++ b/include/grpcpp/alarm.h @@ -85,6 +85,9 @@ class Alarm : private GrpcLibraryCodegen { public: explicit experimental_type(Alarm* alarm) : alarm_(alarm) {} + /// Set an alarm to invoke callback \a f. The argument to the callback + /// states whether the alarm expired at \a deadline (true) or was cancelled + /// (false) template void Set(const T& deadline, std::function f) { alarm_->SetInternal(TimePoint(deadline).raw_time(), std::move(f));