Merge pull request #16624 from vjpai/callback_alarms

EXPERIMENTAL: Add callback-based alarms
pull/16657/head
Vijay Pai 6 years ago committed by GitHub
commit 46429a2a6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      include/grpcpp/alarm.h
  2. 47
      src/cpp/common/alarm.cc
  3. 106
      test/cpp/common/alarm_test.cc

@ -21,6 +21,8 @@
#ifndef GRPCPP_ALARM_H #ifndef GRPCPP_ALARM_H
#define GRPCPP_ALARM_H #define GRPCPP_ALARM_H
#include <functional>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpcpp/impl/codegen/completion_queue.h> #include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/codegen/completion_queue_tag.h>
@ -76,8 +78,33 @@ class Alarm : private GrpcLibraryCodegen {
/// has already fired has no effect. /// has already fired has no effect.
void Cancel(); void Cancel();
/// NOTE: class experimental_type is not part of the public API of this class
/// TODO(vjpai): Move these contents to the public API of Alarm when
/// they are no longer experimental
class experimental_type {
public:
explicit experimental_type(Alarm* alarm) : alarm_(alarm) {}
/// Set an alarm to invoke callback \a f. The argument to the callback
/// states whether the alarm expired at \a deadline (true) or was cancelled
/// (false)
template <typename T>
void Set(const T& deadline, std::function<void(bool)> f) {
alarm_->SetInternal(TimePoint<T>(deadline).raw_time(), std::move(f));
}
private:
Alarm* alarm_;
};
/// NOTE: The function experimental() is not stable public API. It is a view
/// to the experimental components of this class. It may be changed or removed
/// at any time.
experimental_type experimental() { return experimental_type(this); }
private: private:
void SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag); void SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag);
void SetInternal(gpr_timespec deadline, std::function<void(bool)> f);
internal::CompletionQueueTag* alarm_; internal::CompletionQueueTag* alarm_;
}; };

@ -39,17 +39,6 @@ class AlarmImpl : public CompletionQueueTag {
AlarmImpl() : cq_(nullptr), tag_(nullptr) { AlarmImpl() : cq_(nullptr), tag_(nullptr) {
gpr_ref_init(&refs_, 1); gpr_ref_init(&refs_, 1);
grpc_timer_init_unset(&timer_); grpc_timer_init_unset(&timer_);
GRPC_CLOSURE_INIT(&on_alarm_,
[](void* arg, grpc_error* error) {
// queue the op on the completion queue
AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
alarm->Ref();
grpc_cq_end_op(
alarm->cq_, alarm, error,
[](void* arg, grpc_cq_completion* completion) {},
arg, &alarm->completion_);
},
this, grpc_schedule_on_exec_ctx);
} }
~AlarmImpl() { ~AlarmImpl() {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
@ -68,6 +57,32 @@ class AlarmImpl : public CompletionQueueTag {
cq_ = cq->cq(); cq_ = cq->cq();
tag_ = tag; tag_ = tag;
GPR_ASSERT(grpc_cq_begin_op(cq_, this)); GPR_ASSERT(grpc_cq_begin_op(cq_, this));
GRPC_CLOSURE_INIT(&on_alarm_,
[](void* arg, grpc_error* error) {
// queue the op on the completion queue
AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
alarm->Ref();
grpc_cq_end_op(
alarm->cq_, alarm, error,
[](void* arg, grpc_cq_completion* completion) {},
arg, &alarm->completion_);
},
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
&on_alarm_);
}
void Set(gpr_timespec deadline, std::function<void(bool)> f) {
grpc_core::ExecCtx exec_ctx;
// Don't use any CQ at all. Instead just use the timer to fire the function
callback_ = std::move(f);
Ref();
GRPC_CLOSURE_INIT(&on_alarm_,
[](void* arg, grpc_error* error) {
AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
alarm->callback_(error == GRPC_ERROR_NONE);
alarm->Unref();
},
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
&on_alarm_); &on_alarm_);
} }
@ -95,6 +110,7 @@ class AlarmImpl : public CompletionQueueTag {
// completion queue where events about this alarm will be posted // completion queue where events about this alarm will be posted
grpc_completion_queue* cq_; grpc_completion_queue* cq_;
void* tag_; void* tag_;
std::function<void(bool)> callback_;
}; };
} // namespace internal } // namespace internal
@ -113,6 +129,15 @@ void Alarm::SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag) {
static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag); static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
} }
void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
// Note that we know that alarm_ is actually an internal::AlarmImpl
// but we declared it as the base pointer to avoid a forward declaration
// or exposing core data structures in the C++ public headers.
// Thus it is safe to use a static_cast to the subclass here, and the
// C++ style guide allows us to do so in this case
static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
}
Alarm::~Alarm() { Alarm::~Alarm() {
if (alarm_ != nullptr) { if (alarm_ != nullptr) {
static_cast<internal::AlarmImpl*>(alarm_)->Destroy(); static_cast<internal::AlarmImpl*>(alarm_)->Destroy();

@ -16,9 +16,13 @@
* *
*/ */
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#include <grpcpp/alarm.h> #include <grpcpp/alarm.h>
#include <grpcpp/completion_queue.h> #include <grpcpp/completion_queue.h>
#include <thread>
#include <gtest/gtest.h> #include <gtest/gtest.h>
@ -43,6 +47,66 @@ TEST(AlarmTest, RegularExpiry) {
EXPECT_EQ(junk, output_tag); EXPECT_EQ(junk, output_tag);
} }
struct Completion {
bool completed = false;
std::mutex mu;
std::condition_variable cv;
};
TEST(AlarmTest, CallbackRegularExpiry) {
Alarm alarm;
auto c = std::make_shared<Completion>();
alarm.experimental().Set(
std::chrono::system_clock::now() + std::chrono::seconds(1), [c](bool ok) {
EXPECT_TRUE(ok);
std::lock_guard<std::mutex> l(c->mu);
c->completed = true;
c->cv.notify_one();
});
std::unique_lock<std::mutex> l(c->mu);
EXPECT_TRUE(c->cv.wait_until(
l, std::chrono::system_clock::now() + std::chrono::seconds(10),
[c] { return c->completed; }));
}
TEST(AlarmTest, CallbackZeroExpiry) {
Alarm alarm;
auto c = std::make_shared<Completion>();
alarm.experimental().Set(grpc_timeout_seconds_to_deadline(0), [c](bool ok) {
EXPECT_TRUE(ok);
std::lock_guard<std::mutex> l(c->mu);
c->completed = true;
c->cv.notify_one();
});
std::unique_lock<std::mutex> l(c->mu);
EXPECT_TRUE(c->cv.wait_until(
l, std::chrono::system_clock::now() + std::chrono::seconds(10),
[c] { return c->completed; }));
}
TEST(AlarmTest, CallbackNegativeExpiry) {
Alarm alarm;
auto c = std::make_shared<Completion>();
alarm.experimental().Set(
std::chrono::system_clock::now() + std::chrono::seconds(-1),
[c](bool ok) {
EXPECT_TRUE(ok);
std::lock_guard<std::mutex> l(c->mu);
c->completed = true;
c->cv.notify_one();
});
std::unique_lock<std::mutex> l(c->mu);
EXPECT_TRUE(c->cv.wait_until(
l, std::chrono::system_clock::now() + std::chrono::seconds(10),
[c] { return c->completed; }));
}
TEST(AlarmTest, MultithreadedRegularExpiry) { TEST(AlarmTest, MultithreadedRegularExpiry) {
CompletionQueue cq; CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033); void* junk = reinterpret_cast<void*>(1618033);
@ -182,6 +246,26 @@ TEST(AlarmTest, Cancellation) {
EXPECT_EQ(junk, output_tag); EXPECT_EQ(junk, output_tag);
} }
TEST(AlarmTest, CallbackCancellation) {
Alarm alarm;
auto c = std::make_shared<Completion>();
alarm.experimental().Set(
std::chrono::system_clock::now() + std::chrono::seconds(10),
[c](bool ok) {
EXPECT_FALSE(ok);
std::lock_guard<std::mutex> l(c->mu);
c->completed = true;
c->cv.notify_one();
});
alarm.Cancel();
std::unique_lock<std::mutex> l(c->mu);
EXPECT_TRUE(c->cv.wait_until(
l, std::chrono::system_clock::now() + std::chrono::seconds(1),
[c] { return c->completed; }));
}
TEST(AlarmTest, SetDestruction) { TEST(AlarmTest, SetDestruction) {
CompletionQueue cq; CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033); void* junk = reinterpret_cast<void*>(1618033);
@ -200,6 +284,26 @@ TEST(AlarmTest, SetDestruction) {
EXPECT_EQ(junk, output_tag); EXPECT_EQ(junk, output_tag);
} }
TEST(AlarmTest, CallbackSetDestruction) {
auto c = std::make_shared<Completion>();
{
Alarm alarm;
alarm.experimental().Set(
std::chrono::system_clock::now() + std::chrono::seconds(10),
[c](bool ok) {
EXPECT_FALSE(ok);
std::lock_guard<std::mutex> l(c->mu);
c->completed = true;
c->cv.notify_one();
});
}
std::unique_lock<std::mutex> l(c->mu);
EXPECT_TRUE(c->cv.wait_until(
l, std::chrono::system_clock::now() + std::chrono::seconds(1),
[c] { return c->completed; }));
}
TEST(AlarmTest, UnsetDestruction) { TEST(AlarmTest, UnsetDestruction) {
CompletionQueue cq; CompletionQueue cq;
Alarm alarm; Alarm alarm;

Loading…
Cancel
Save