From 356fff684a63f9503c4ff4d7b22ef7bd20f3aa12 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 17 Aug 2018 14:55:05 -0700 Subject: [PATCH 1/7] Improve documentation on lifetime of message and status --- include/grpcpp/impl/codegen/async_stream.h | 43 +++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/include/grpcpp/impl/codegen/async_stream.h b/include/grpcpp/impl/codegen/async_stream.h index b2134590c3c..cd433e48696 100644 --- a/include/grpcpp/impl/codegen/async_stream.h +++ b/include/grpcpp/impl/codegen/async_stream.h @@ -64,7 +64,7 @@ class ClientAsyncStreamingInterface { /// earlier call to \a AsyncReaderInterface::Read that yielded a failed /// result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). /// - /// This function will return when either: + /// The tag will be returned when either: /// - all incoming messages have been read and the server has returned /// a status. /// - the server has returned a non-OK status. @@ -112,6 +112,8 @@ class AsyncWriterInterface { /// Only one write may be outstanding at any given time. This means that /// after calling Write, one must wait to receive \a tag from the completion /// queue BEFORE calling Write again. + /// GRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// to deallocate once Write returns. /// This is thread-safe with respect to \a AsyncReaderInterface::Read /// /// \param[in] msg The message to be written. @@ -124,6 +126,8 @@ class AsyncWriterInterface { /// Only one write may be outstanding at any given time. This means that /// after calling Write, one must wait to receive \a tag from the completion /// queue BEFORE calling Write again. + /// GRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// to deallocate once Write returns. /// WriteOptions \a options is used to set the write options of this message. /// This is thread-safe with respect to \a AsyncReaderInterface::Read /// @@ -143,6 +147,8 @@ class AsyncWriterInterface { /// and write is initiated. Note that WriteLast can only buffer \a msg up to /// the flow control window size. If \a msg size is larger than the window /// size, it will be sent on wire without buffering. + /// GRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// to deallocate once Write returns. /// /// \param[in] msg The message to be written. /// \param[in] options The WriteOptions to be used to write this message. @@ -629,6 +635,8 @@ class ServerAsyncReaderInterface /// This operation will end when the server has finished sending out initial /// metadata (if not sent already), response message, and status, or if /// some failure occurred when trying to do so. + /// GRPC doesn't take ownership or a reference to \a msg or \a status, so it + /// is safe to to deallocate once Finish returns. /// /// \param[in] tag Tag identifying this request. /// \param[in] status To be sent to the client as the result of this call. @@ -650,6 +658,9 @@ class ServerAsyncReaderInterface /// metadata (if not sent already), and status, or if some failure occurred /// when trying to do so. /// + /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once FinishWithError returns. + /// /// \param[in] tag Tag identifying this request. /// \param[in] status To be sent to the client as the result of this call. /// - Note: \a status must have a non-OK code. @@ -697,6 +708,9 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface { /// initial and trailing metadata. /// /// Note: \a msg is not sent if \a status has a non-OK code. + /// + /// GRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to to deallocate once Finish returns. void Finish(const W& msg, const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { @@ -723,6 +737,9 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface { /// - also sends initial metadata if not alreay sent. /// - uses the \a ServerContext associated with this call to send possible /// initial and trailing metadata. + /// + /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once FinishWithError returns. void FinishWithError(const Status& status, void* tag) override { GPR_CODEGEN_ASSERT(!status.ok()); finish_ops_.set_output_tag(tag); @@ -773,6 +790,9 @@ class ServerAsyncWriterInterface /// metadata (if not sent already), response message, and status, or if /// some failure occurred when trying to do so. /// + /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. + /// /// \param[in] tag Tag identifying this request. /// \param[in] status To be sent to the client as the result of this call. virtual void Finish(const Status& status, void* tag) = 0; @@ -784,6 +804,9 @@ class ServerAsyncWriterInterface /// WriteAndFinish is equivalent of performing WriteLast and Finish /// in a single step. /// + /// GRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to to deallocate once WriteAndFinish returns. + /// /// \param[in] msg The message to be written. /// \param[in] options The WriteOptions to be used to write this message. /// \param[in] status The Status that server returns to client. @@ -847,6 +870,9 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { /// for sending trailing (and initial) metadata to the client. /// /// Note: \a status must have an OK code. + /// + /// GRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to to deallocate once WriteAndFinish returns. void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, void* tag) override { write_ops_.set_output_tag(tag); @@ -865,6 +891,9 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { /// /// Note: there are no restrictions are the code of /// \a status,it may be non-OK + /// + /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); EnsureInitialMetadataSent(&finish_ops_); @@ -924,6 +953,9 @@ class ServerAsyncReaderWriterInterface /// metadata (if not sent already), response message, and status, or if some /// failure occurred when trying to do so. /// + /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. + /// /// \param[in] tag Tag identifying this request. /// \param[in] status To be sent to the client as the result of this call. virtual void Finish(const Status& status, void* tag) = 0; @@ -935,6 +967,9 @@ class ServerAsyncReaderWriterInterface /// WriteAndFinish is equivalent of performing WriteLast and Finish in a /// single step. /// + /// GRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to to deallocate once WriteAndFinish returns. + /// /// \param[in] msg The message to be written. /// \param[in] options The WriteOptions to be used to write this message. /// \param[in] status The Status that server returns to client. @@ -1006,6 +1041,9 @@ class ServerAsyncReaderWriter final /// for sending trailing (and initial) metadata to the client. /// /// Note: \a status must have an OK code. + // + /// GRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to to deallocate once WriteAndFinish returns. void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, void* tag) override { write_ops_.set_output_tag(tag); @@ -1024,6 +1062,9 @@ class ServerAsyncReaderWriter final /// /// Note: there are no restrictions are the code of \a status, /// it may be non-OK + // + /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); EnsureInitialMetadataSent(&finish_ops_); From f80af5a7c753f18f0f15ab6b377949fc4b06d69f Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 17 Aug 2018 14:58:01 -0700 Subject: [PATCH 2/7] Formatting --- include/grpcpp/impl/codegen/async_stream.h | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/include/grpcpp/impl/codegen/async_stream.h b/include/grpcpp/impl/codegen/async_stream.h index cd433e48696..5df849f610a 100644 --- a/include/grpcpp/impl/codegen/async_stream.h +++ b/include/grpcpp/impl/codegen/async_stream.h @@ -112,9 +112,10 @@ class AsyncWriterInterface { /// Only one write may be outstanding at any given time. This means that /// after calling Write, one must wait to receive \a tag from the completion /// queue BEFORE calling Write again. + /// This is thread-safe with respect to \a AsyncReaderInterface::Read + /// /// GRPC doesn't take ownership or a reference to \a msg, so it is safe to /// to deallocate once Write returns. - /// This is thread-safe with respect to \a AsyncReaderInterface::Read /// /// \param[in] msg The message to be written. /// \param[in] tag The tag identifying the operation. @@ -126,11 +127,12 @@ class AsyncWriterInterface { /// Only one write may be outstanding at any given time. This means that /// after calling Write, one must wait to receive \a tag from the completion /// queue BEFORE calling Write again. - /// GRPC doesn't take ownership or a reference to \a msg, so it is safe to - /// to deallocate once Write returns. /// WriteOptions \a options is used to set the write options of this message. /// This is thread-safe with respect to \a AsyncReaderInterface::Read /// + /// GRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// to deallocate once Write returns. + /// /// \param[in] msg The message to be written. /// \param[in] options The WriteOptions to be used to write this message. /// \param[in] tag The tag identifying the operation. @@ -147,6 +149,7 @@ class AsyncWriterInterface { /// and write is initiated. Note that WriteLast can only buffer \a msg up to /// the flow control window size. If \a msg size is larger than the window /// size, it will be sent on wire without buffering. + /// /// GRPC doesn't take ownership or a reference to \a msg, so it is safe to /// to deallocate once Write returns. /// @@ -635,6 +638,7 @@ class ServerAsyncReaderInterface /// This operation will end when the server has finished sending out initial /// metadata (if not sent already), response message, and status, or if /// some failure occurred when trying to do so. + /// /// GRPC doesn't take ownership or a reference to \a msg or \a status, so it /// is safe to to deallocate once Finish returns. /// From 5f2bb7a7d4d90f09423419a1bc8bc35847c7cb4e Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 12 Sep 2018 11:14:18 -0700 Subject: [PATCH 3/7] s/GRPC/gRPC --- include/grpcpp/impl/codegen/async_stream.h | 30 +++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/include/grpcpp/impl/codegen/async_stream.h b/include/grpcpp/impl/codegen/async_stream.h index 5df849f610a..9c3c40af542 100644 --- a/include/grpcpp/impl/codegen/async_stream.h +++ b/include/grpcpp/impl/codegen/async_stream.h @@ -114,7 +114,7 @@ class AsyncWriterInterface { /// queue BEFORE calling Write again. /// This is thread-safe with respect to \a AsyncReaderInterface::Read /// - /// GRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to /// to deallocate once Write returns. /// /// \param[in] msg The message to be written. @@ -130,7 +130,7 @@ class AsyncWriterInterface { /// WriteOptions \a options is used to set the write options of this message. /// This is thread-safe with respect to \a AsyncReaderInterface::Read /// - /// GRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to /// to deallocate once Write returns. /// /// \param[in] msg The message to be written. @@ -150,7 +150,7 @@ class AsyncWriterInterface { /// the flow control window size. If \a msg size is larger than the window /// size, it will be sent on wire without buffering. /// - /// GRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to /// to deallocate once Write returns. /// /// \param[in] msg The message to be written. @@ -639,7 +639,7 @@ class ServerAsyncReaderInterface /// metadata (if not sent already), response message, and status, or if /// some failure occurred when trying to do so. /// - /// GRPC doesn't take ownership or a reference to \a msg or \a status, so it + /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it /// is safe to to deallocate once Finish returns. /// /// \param[in] tag Tag identifying this request. @@ -662,7 +662,7 @@ class ServerAsyncReaderInterface /// metadata (if not sent already), and status, or if some failure occurred /// when trying to do so. /// - /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to /// to deallocate once FinishWithError returns. /// /// \param[in] tag Tag identifying this request. @@ -713,7 +713,7 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface { /// /// Note: \a msg is not sent if \a status has a non-OK code. /// - /// GRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it /// is safe to to deallocate once Finish returns. void Finish(const W& msg, const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); @@ -742,7 +742,7 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface { /// - uses the \a ServerContext associated with this call to send possible /// initial and trailing metadata. /// - /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to /// to deallocate once FinishWithError returns. void FinishWithError(const Status& status, void* tag) override { GPR_CODEGEN_ASSERT(!status.ok()); @@ -794,7 +794,7 @@ class ServerAsyncWriterInterface /// metadata (if not sent already), response message, and status, or if /// some failure occurred when trying to do so. /// - /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to /// to deallocate once Finish returns. /// /// \param[in] tag Tag identifying this request. @@ -808,7 +808,7 @@ class ServerAsyncWriterInterface /// WriteAndFinish is equivalent of performing WriteLast and Finish /// in a single step. /// - /// GRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it /// is safe to to deallocate once WriteAndFinish returns. /// /// \param[in] msg The message to be written. @@ -875,7 +875,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { /// /// Note: \a status must have an OK code. /// - /// GRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it /// is safe to to deallocate once WriteAndFinish returns. void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, void* tag) override { @@ -896,7 +896,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { /// Note: there are no restrictions are the code of /// \a status,it may be non-OK /// - /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to /// to deallocate once Finish returns. void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); @@ -957,7 +957,7 @@ class ServerAsyncReaderWriterInterface /// metadata (if not sent already), response message, and status, or if some /// failure occurred when trying to do so. /// - /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to /// to deallocate once Finish returns. /// /// \param[in] tag Tag identifying this request. @@ -971,7 +971,7 @@ class ServerAsyncReaderWriterInterface /// WriteAndFinish is equivalent of performing WriteLast and Finish in a /// single step. /// - /// GRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it /// is safe to to deallocate once WriteAndFinish returns. /// /// \param[in] msg The message to be written. @@ -1046,7 +1046,7 @@ class ServerAsyncReaderWriter final /// /// Note: \a status must have an OK code. // - /// GRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it /// is safe to to deallocate once WriteAndFinish returns. void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, void* tag) override { @@ -1067,7 +1067,7 @@ class ServerAsyncReaderWriter final /// Note: there are no restrictions are the code of \a status, /// it may be non-OK // - /// GRPC doesn't take ownership or a reference to \a status, so it is safe to + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to /// to deallocate once Finish returns. void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); From db01bf793aeab78b8b8d85686977240afb56a536 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 11 Sep 2018 17:01:19 -0700 Subject: [PATCH 4/7] Add callback-based alarms --- include/grpcpp/alarm.h | 24 ++++++++ src/cpp/common/alarm.cc | 47 +++++++++++---- test/cpp/common/alarm_test.cc | 106 +++++++++++++++++++++++++++++++++- 3 files changed, 165 insertions(+), 12 deletions(-) diff --git a/include/grpcpp/alarm.h b/include/grpcpp/alarm.h index f484610a6ed..f9008c327e4 100644 --- a/include/grpcpp/alarm.h +++ b/include/grpcpp/alarm.h @@ -21,6 +21,8 @@ #ifndef GRPCPP_ALARM_H #define GRPCPP_ALARM_H +#include + #include #include #include @@ -76,8 +78,30 @@ class Alarm : private GrpcLibraryCodegen { /// has already fired has no effect. void Cancel(); + /// NOTE: class experimental_type is not part of the public API of this class + /// TODO(vjpai): Move these contents to the public API of Alarm when + /// they are no longer experimental + class experimental_type { + public: + explicit experimental_type(Alarm* alarm) : alarm_(alarm) {} + + template + void Set(const T& deadline, std::function f) { + alarm_->SetInternal(TimePoint(deadline).raw_time(), std::move(f)); + } + + private: + Alarm* alarm_; + }; + + /// NOTE: The function experimental() is not stable public API. It is a view + /// to the experimental components of this class. It may be changed or removed + /// at any time. + experimental_type experimental() { return experimental_type(this); } + private: void SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag); + void SetInternal(gpr_timespec deadline, std::function f); internal::CompletionQueueTag* alarm_; }; diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index 15a373d8a54..5819a4210bd 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -39,17 +39,6 @@ class AlarmImpl : public CompletionQueueTag { AlarmImpl() : cq_(nullptr), tag_(nullptr) { gpr_ref_init(&refs_, 1); grpc_timer_init_unset(&timer_); - GRPC_CLOSURE_INIT(&on_alarm_, - [](void* arg, grpc_error* error) { - // queue the op on the completion queue - AlarmImpl* alarm = static_cast(arg); - alarm->Ref(); - grpc_cq_end_op( - alarm->cq_, alarm, error, - [](void* arg, grpc_cq_completion* completion) {}, - arg, &alarm->completion_); - }, - this, grpc_schedule_on_exec_ctx); } ~AlarmImpl() { grpc_core::ExecCtx exec_ctx; @@ -68,6 +57,32 @@ class AlarmImpl : public CompletionQueueTag { cq_ = cq->cq(); tag_ = tag; GPR_ASSERT(grpc_cq_begin_op(cq_, this)); + GRPC_CLOSURE_INIT(&on_alarm_, + [](void* arg, grpc_error* error) { + // queue the op on the completion queue + AlarmImpl* alarm = static_cast(arg); + alarm->Ref(); + grpc_cq_end_op( + alarm->cq_, alarm, error, + [](void* arg, grpc_cq_completion* completion) {}, + arg, &alarm->completion_); + }, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), + &on_alarm_); + } + void Set(gpr_timespec deadline, std::function f) { + grpc_core::ExecCtx exec_ctx; + // Don't use any CQ at all. Instead just use the timer to fire the function + callback_ = std::move(f); + Ref(); + GRPC_CLOSURE_INIT(&on_alarm_, + [](void* arg, grpc_error* error) { + AlarmImpl* alarm = static_cast(arg); + alarm->callback_(error == GRPC_ERROR_NONE); + alarm->Unref(); + }, + this, grpc_schedule_on_exec_ctx); grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), &on_alarm_); } @@ -95,6 +110,7 @@ class AlarmImpl : public CompletionQueueTag { // completion queue where events about this alarm will be posted grpc_completion_queue* cq_; void* tag_; + std::function callback_; }; } // namespace internal @@ -113,6 +129,15 @@ void Alarm::SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag) { static_cast(alarm_)->Set(cq, deadline, tag); } +void Alarm::SetInternal(gpr_timespec deadline, std::function f) { + // Note that we know that alarm_ is actually an internal::AlarmImpl + // but we declared it as the base pointer to avoid a forward declaration + // or exposing core data structures in the C++ public headers. + // Thus it is safe to use a static_cast to the subclass here, and the + // C++ style guide allows us to do so in this case + static_cast(alarm_)->Set(deadline, std::move(f)); +} + Alarm::~Alarm() { if (alarm_ != nullptr) { static_cast(alarm_)->Destroy(); diff --git a/test/cpp/common/alarm_test.cc b/test/cpp/common/alarm_test.cc index 57d958349ec..e909d03658c 100644 --- a/test/cpp/common/alarm_test.cc +++ b/test/cpp/common/alarm_test.cc @@ -16,9 +16,13 @@ * */ +#include +#include +#include +#include + #include #include -#include #include @@ -43,6 +47,66 @@ TEST(AlarmTest, RegularExpiry) { EXPECT_EQ(junk, output_tag); } +struct Completion { + bool completed = false; + std::mutex mu; + std::condition_variable cv; +}; + +TEST(AlarmTest, CallbackRegularExpiry) { + Alarm alarm; + + auto c = std::make_shared(); + alarm.experimental().Set( + std::chrono::system_clock::now() + std::chrono::seconds(1), [c](bool ok) { + EXPECT_TRUE(ok); + std::lock_guard l(c->mu); + c->completed = true; + c->cv.notify_one(); + }); + + std::unique_lock l(c->mu); + EXPECT_TRUE(c->cv.wait_until( + l, std::chrono::system_clock::now() + std::chrono::seconds(10), + [c] { return c->completed; })); +} + +TEST(AlarmTest, CallbackZeroExpiry) { + Alarm alarm; + + auto c = std::make_shared(); + alarm.experimental().Set(grpc_timeout_seconds_to_deadline(0), [c](bool ok) { + EXPECT_TRUE(ok); + std::lock_guard l(c->mu); + c->completed = true; + c->cv.notify_one(); + }); + + std::unique_lock l(c->mu); + EXPECT_TRUE(c->cv.wait_until( + l, std::chrono::system_clock::now() + std::chrono::seconds(10), + [c] { return c->completed; })); +} + +TEST(AlarmTest, CallbackNegativeExpiry) { + Alarm alarm; + + auto c = std::make_shared(); + alarm.experimental().Set( + std::chrono::system_clock::now() + std::chrono::seconds(-1), + [c](bool ok) { + EXPECT_TRUE(ok); + std::lock_guard l(c->mu); + c->completed = true; + c->cv.notify_one(); + }); + + std::unique_lock l(c->mu); + EXPECT_TRUE(c->cv.wait_until( + l, std::chrono::system_clock::now() + std::chrono::seconds(10), + [c] { return c->completed; })); +} + TEST(AlarmTest, MultithreadedRegularExpiry) { CompletionQueue cq; void* junk = reinterpret_cast(1618033); @@ -182,6 +246,26 @@ TEST(AlarmTest, Cancellation) { EXPECT_EQ(junk, output_tag); } +TEST(AlarmTest, CallbackCancellation) { + Alarm alarm; + + auto c = std::make_shared(); + alarm.experimental().Set( + std::chrono::system_clock::now() + std::chrono::seconds(10), + [c](bool ok) { + EXPECT_FALSE(ok); + std::lock_guard l(c->mu); + c->completed = true; + c->cv.notify_one(); + }); + alarm.Cancel(); + + std::unique_lock l(c->mu); + EXPECT_TRUE(c->cv.wait_until( + l, std::chrono::system_clock::now() + std::chrono::seconds(1), + [c] { return c->completed; })); +} + TEST(AlarmTest, SetDestruction) { CompletionQueue cq; void* junk = reinterpret_cast(1618033); @@ -200,6 +284,26 @@ TEST(AlarmTest, SetDestruction) { EXPECT_EQ(junk, output_tag); } +TEST(AlarmTest, CallbackSetDestruction) { + auto c = std::make_shared(); + { + Alarm alarm; + alarm.experimental().Set( + std::chrono::system_clock::now() + std::chrono::seconds(10), + [c](bool ok) { + EXPECT_FALSE(ok); + std::lock_guard l(c->mu); + c->completed = true; + c->cv.notify_one(); + }); + } + + std::unique_lock l(c->mu); + EXPECT_TRUE(c->cv.wait_until( + l, std::chrono::system_clock::now() + std::chrono::seconds(1), + [c] { return c->completed; })); +} + TEST(AlarmTest, UnsetDestruction) { CompletionQueue cq; Alarm alarm; From 1359543b47c0ecff8ecd50ae4d3a2989e16a8a94 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 19 Sep 2018 09:39:21 -0700 Subject: [PATCH 5/7] Add a comment to address reviewer feedback --- include/grpcpp/alarm.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/include/grpcpp/alarm.h b/include/grpcpp/alarm.h index f9008c327e4..365feb4eb95 100644 --- a/include/grpcpp/alarm.h +++ b/include/grpcpp/alarm.h @@ -85,6 +85,9 @@ class Alarm : private GrpcLibraryCodegen { public: explicit experimental_type(Alarm* alarm) : alarm_(alarm) {} + /// Set an alarm to invoke callback \a f. The argument to the callback + /// states whether the alarm expired at \a deadline (true) or was cancelled + /// (false) template void Set(const T& deadline, std::function f) { alarm_->SetInternal(TimePoint(deadline).raw_time(), std::move(f)); From dd10cbc5545f69882847b96d3dc41a64e3ada466 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Wed, 19 Sep 2018 10:28:39 -0700 Subject: [PATCH 6/7] Change force_creation from atm to bool --- .../client_channel/subchannel_index.cc | 6 +-- test/cpp/end2end/client_lb_end2end_test.cc | 41 +++++++++++++------ 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc index f2b6c24e8ee..1c23a6c4be4 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.cc +++ b/src/core/ext/filters/client_channel/subchannel_index.cc @@ -42,7 +42,7 @@ struct grpc_subchannel_key { grpc_subchannel_args args; }; -static gpr_atm g_force_creation = false; +static bool g_force_creation = false; static grpc_subchannel_key* create_key( const grpc_subchannel_args* args, @@ -74,7 +74,7 @@ static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) { int grpc_subchannel_key_compare(const grpc_subchannel_key* a, const grpc_subchannel_key* b) { // To pretend the keys are different, return a non-zero value. - if (GPR_UNLIKELY(gpr_atm_no_barrier_load(&g_force_creation))) return 1; + if (GPR_UNLIKELY(g_force_creation)) return 1; int c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; if (a->args.filter_count > 0) { @@ -251,5 +251,5 @@ void grpc_subchannel_index_unregister(grpc_subchannel_key* key, } void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) { - gpr_atm_no_barrier_store(&g_force_creation, force_creation); + g_force_creation = force_creation; } diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index e5d6132012a..a9d68ab0582 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -119,6 +119,7 @@ class ClientLbEnd2endTest : public ::testing::Test { } void SetUp() override { + grpc_init(); response_generator_ = grpc_core::MakeRefCounted(); } @@ -127,6 +128,7 @@ class ClientLbEnd2endTest : public ::testing::Test { for (size_t i = 0; i < servers_.size(); ++i) { servers_[i]->Shutdown(); } + grpc_shutdown(); } void CreateServers(size_t num_servers, @@ -560,7 +562,23 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); } -TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) { +class ClientLbEnd2endWithParamTest + : public ClientLbEnd2endTest, + public ::testing::WithParamInterface { + protected: + void SetUp() override { + grpc_subchannel_index_test_only_set_force_creation(GetParam()); + ClientLbEnd2endTest::SetUp(); + } + + void TearDown() override { + ClientLbEnd2endTest::TearDown(); + grpc_subchannel_index_test_only_set_force_creation(false); + } +}; + +TEST_P(ClientLbEnd2endWithParamTest, PickFirstManyUpdates) { + gpr_log(GPR_INFO, "subchannel force creation: %d", GetParam()); // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); @@ -570,20 +588,21 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) { for (size_t i = 0; i < servers_.size(); ++i) { ports.emplace_back(servers_[i]->port_); } - for (const bool force_creation : {true, false}) { - grpc_subchannel_index_test_only_set_force_creation(force_creation); - gpr_log(GPR_INFO, "Force subchannel creation: %d", force_creation); - for (size_t i = 0; i < 1000; ++i) { - std::shuffle(ports.begin(), ports.end(), - std::mt19937(std::random_device()())); - SetNextResolution(ports); - if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION); - } + for (size_t i = 0; i < 1000; ++i) { + std::shuffle(ports.begin(), ports.end(), + std::mt19937(std::random_device()())); + SetNextResolution(ports); + // We should re-enter core at the end of the loop to give the resolution + // setting closure a chance to run. + if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION); } // Check LB policy name for the channel. EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); } +INSTANTIATE_TEST_CASE_P(SubchannelForceCreation, ClientLbEnd2endWithParamTest, + ::testing::Bool()); + TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) { // Prepare the ports for up servers and down servers. const int kNumServers = 3; @@ -984,8 +1003,6 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); grpc_test_init(argc, argv); - grpc_init(); const auto result = RUN_ALL_TESTS(); - grpc_shutdown(); return result; } From deb3126611a513b768052d93a976832985640d9f Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 19 Sep 2018 11:29:11 -0700 Subject: [PATCH 7/7] Fix clang-tidy concerns --- src/cpp/common/callback_common.cc | 4 ++-- test/cpp/end2end/client_callback_end2end_test.cc | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc index fa586286d1c..a0c8eeb5160 100644 --- a/src/cpp/common/callback_common.cc +++ b/src/cpp/common/callback_common.cc @@ -66,8 +66,8 @@ class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface { GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok)); GPR_ASSERT(ignored == parent_->ops()); - // Last use of func_ or ok, so ok to move them out for rvalue call above - CatchingCallback(std::move(func_), std::move(ok)); + // Last use of func_, so ok to move it out for rvalue call above + CatchingCallback(std::move(func_), ok); func_ = nullptr; // reset to clear this out for sure grpc_call_unref(call_); diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 3b492090dd1..d8cb44b694a 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -95,6 +95,8 @@ class ClientCallbackEnd2endTest : public ::testing::Test { if (maybe_except) { throw - 1; } +#else + GPR_ASSERT(!maybe_except); #endif }); std::unique_lock l(mu);