diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 6696f19d76c..1bdb9ae0de9 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -98,44 +98,69 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { return grpc_channel_check_connectivity_state(c_channel_, try_to_connect); } -void Channel::NotifyOnStateChange(grpc_connectivity_state last_observed, - gpr_timespec deadline, - CompletionQueue* cq, void* tag) { - grpc_channel_watch_connectivity_state(c_channel_, last_observed, deadline, - cq->cq(), tag); +namespace { +class TagSaver GRPC_FINAL : public CompletionQueueTag { + public: + explicit TagSaver(void* tag) : tag_(tag) {} + ~TagSaver() GRPC_OVERRIDE {} + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + *tag = tag_; + delete this; + return true; + } + private: + void* tag_; +}; + +template +void NotifyOnStateChangeShared(grpc_channel* channel, + grpc_connectivity_state last_observed, + const T& deadline, + CompletionQueue* cq, void* tag) { + TimePoint deadline_tp(deadline); + TagSaver* tag_saver = new TagSaver(tag); + grpc_channel_watch_connectivity_state( + channel, last_observed, deadline_tp.raw_time(), cq->cq(), tag_saver); } -bool Channel::WaitForStateChange(grpc_connectivity_state last_observed, - gpr_timespec deadline) { +template +bool WaitForStateChangeShared(grpc_channel* channel, + grpc_connectivity_state last_observed, + const T& deadline) { CompletionQueue cq; bool ok = false; void* tag = NULL; - NotifyOnStateChange(last_observed, deadline, &cq, NULL); + NotifyOnStateChangeShared(channel, last_observed, deadline, &cq, NULL); cq.Next(&tag, &ok); GPR_ASSERT(tag == NULL); return ok; } +} // namespace + +void Channel::NotifyOnStateChange(grpc_connectivity_state last_observed, + gpr_timespec deadline, + CompletionQueue* cq, void* tag) { + NotifyOnStateChangeShared(c_channel_, last_observed, deadline, cq, tag); +} + +bool Channel::WaitForStateChange(grpc_connectivity_state last_observed, + gpr_timespec deadline) { + return WaitForStateChangeShared(c_channel_, last_observed, deadline); +} + #ifndef GRPC_CXX0X_NO_CHRONO void Channel::NotifyOnStateChange( grpc_connectivity_state last_observed, const std::chrono::system_clock::time_point& deadline, CompletionQueue* cq, void* tag) { - TimePoint deadline_tp(deadline); - grpc_channel_watch_connectivity_state(c_channel_, last_observed, - deadline_tp.raw_time(), cq->cq(), tag); + NotifyOnStateChangeShared(c_channel_, last_observed, deadline, cq, tag); } bool Channel::WaitForStateChange( grpc_connectivity_state last_observed, const std::chrono::system_clock::time_point& deadline) { - CompletionQueue cq; - bool ok = false; - void* tag = NULL; - NotifyOnStateChange(last_observed, deadline, &cq, NULL); - cq.Next(&tag, &ok); - GPR_ASSERT(tag == NULL); - return ok; + return WaitForStateChangeShared(c_channel_, last_observed, deadline); } #endif // !GRPC_CXX0X_NO_CHRONO } // namespace grpc diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 3144ca4dc71..8963382a87a 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -831,7 +831,8 @@ TEST_F(End2endTest, HugeResponse) { } namespace { -void ReaderThreadFunc(ClientReaderWriter* stream, gpr_event *ev) { +void ReaderThreadFunc(ClientReaderWriter* stream, + gpr_event *ev) { EchoResponse resp; gpr_event_set(ev, (void*)1); while (stream->Read(&resp)) { @@ -870,6 +871,27 @@ TEST_F(End2endTest, Peer) { EXPECT_TRUE(CheckIsLocalhost(context.peer())); } +TEST_F(End2endTest, ChannelState) { + ResetStub(); + // Start IDLE + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); + + CompletionQueue cq; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::milliseconds(10); + // No state change. + channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, NULL); + void* tag; + bool ok = true; + cq.Next(&tag, &ok); + EXPECT_FALSE(ok); + + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true)); + EXPECT_TRUE(channel_->WaitForStateChange( + GRPC_CHANNEL_IDLE, gpr_inf_future(GPR_CLOCK_REALTIME))); + EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false)); +} + } // namespace testing } // namespace grpc