diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h index 3849240574f..c50091d6ac1 100644 --- a/include/grpc++/channel.h +++ b/include/grpc++/channel.h @@ -30,8 +30,6 @@ struct grpc_channel; namespace grpc { -class ChannelConnectivityWatcher; - /// Channels represent a connection to an endpoint. Created by \a CreateChannel. class Channel final : public ChannelInterface, public CallHook, @@ -73,7 +71,6 @@ class Channel final : public ChannelInterface, bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) override; - std::unique_ptr connectivity_watcher_; const grpc::string host_; grpc_channel* const c_channel_; // owned }; diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index ffe88df7327..f06d25b0d9b 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -43,8 +43,23 @@ namespace grpc { namespace { -int kConnectivityCheckIntervalMsec = 100; +int kConnectivityCheckIntervalMsec = 500; void WatchStateChange(void* arg); + +class TagSaver final : public CompletionQueueTag { + public: + explicit TagSaver(void* tag) : tag_(tag) {} + ~TagSaver() override {} + bool FinalizeResult(void** tag, bool* status) override { + *tag = tag_; + delete this; + return true; + } + + private: + void* tag_; +}; + } // namespace // Constantly watches channel connectivity status to reconnect a transiently @@ -52,55 +67,80 @@ void WatchStateChange(void* arg); // support. class ChannelConnectivityWatcher { public: - explicit ChannelConnectivityWatcher(Channel* channel) - : channel_(channel), thd_id_(0) {} + ChannelConnectivityWatcher() { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } + + ~ChannelConnectivityWatcher() { + cq_.Shutdown(); + if (thd_id_ != 0) { + gpr_thd_join(thd_id_); + } + } void WatchStateChangeImpl() { - grpc_connectivity_state state = GRPC_CHANNEL_IDLE; bool ok = false; void* tag = NULL; - while (state != GRPC_CHANNEL_SHUTDOWN) { - channel_->NotifyOnStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME), - &cq_, NULL); - while (cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)) == - CompletionQueue::TIMEOUT) { + CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT; + while (status != CompletionQueue::SHUTDOWN) { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + // Make sure we've seen 2 TIMEOUTs before going to sleep + if (status == CompletionQueue::TIMEOUT) { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + } + if (status == CompletionQueue::TIMEOUT) { gpr_sleep_until( gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(kConnectivityCheckIntervalMsec, + gpr_time_from_millis(kConnectivityCheckIntervalMsec, GPR_TIMESPAN))); + } else if (status == CompletionQueue::GOT_EVENT) { + ChannelState* channel_state = static_cast(tag); + channel_state->state = grpc_channel_check_connectivity_state( + channel_state->channel, false); + if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) { + void* shutdown_tag = NULL; + channel_state->shutdown_cq.Next(&shutdown_tag, &ok); + delete channel_state; + } else { + TagSaver* tag_saver = new TagSaver(channel_state); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver); + } } - state = channel_->GetState(false); } } - void StartWatching() { + void StartWatching(grpc_channel* channel) { const char* disabled_str = std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); if (disabled_str == nullptr || strcmp(disabled_str, "1")) { - // This NotifyOnstateChange() is not used to monitor the channel state - // change, but to hold a reference of the c channel. So that - // WatchStateChangeImpl() can observe state == GRPC_CHANNEL_SHUTDOWN - // without holding any lock on the channel object. - channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, - gpr_inf_future(GPR_CLOCK_REALTIME), - &shutdown_cq_, NULL); - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + ChannelState* channel_state = new ChannelState(channel); + // The first grpc_channel_watch_connectivity_state() is not used to + // monitor the channel state change, but to hold a reference of the + // c channel. So that WatchStateChangeImpl() can observe state == + // GRPC_CHANNEL_SHUTDOWN without holding any lock on the channel object. + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(), + new TagSaver(nullptr)); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), + new TagSaver(channel_state)); } } - void Destroy() { - if (thd_id_ != 0) { - gpr_thd_join(thd_id_); - } - bool ok = false; - void* tag = NULL; - shutdown_cq_.Next(&tag, &ok); - } - private: - Channel* channel_; + struct ChannelState { + explicit ChannelState(grpc_channel* channel) + : channel(channel), state(GRPC_CHANNEL_IDLE){}; + grpc_channel* channel; + grpc_connectivity_state state; + CompletionQueue shutdown_cq; + }; gpr_thd_id thd_id_; CompletionQueue cq_; CompletionQueue shutdown_cq_; @@ -112,22 +152,21 @@ void WatchStateChange(void* arg) { static_cast(arg); watcher->WatchStateChangeImpl(); } + +ChannelConnectivityWatcher channel_connectivity_watcher; } // namespace static internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel(const grpc::string& host, grpc_channel* channel) - : connectivity_watcher_(new ChannelConnectivityWatcher(this)), - host_(host), - c_channel_(channel) { + : host_(host), c_channel_(channel) { g_gli_initializer.summon(); if (grpc_channel_support_connectivity_watcher(channel)) { - connectivity_watcher_->StartWatching(); + channel_connectivity_watcher.StartWatching(channel); } } Channel::~Channel() { grpc_channel_destroy(c_channel_); - connectivity_watcher_->Destroy(); } namespace { @@ -213,23 +252,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { return grpc_channel_check_connectivity_state(c_channel_, try_to_connect); } -namespace { -class TagSaver final : public CompletionQueueTag { - public: - explicit TagSaver(void* tag) : tag_(tag) {} - ~TagSaver() override {} - bool FinalizeResult(void** tag, bool* status) override { - *tag = tag_; - delete this; - return true; - } - - private: - void* tag_; -}; - -} // namespace - void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, CompletionQueue* cq, void* tag) { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index f316dd09404..9954f9f9acf 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -704,10 +704,10 @@ TEST_P(End2endTest, ReconnectChannel) { ResetStub(); SendRpc(stub_.get(), 1, false); RestartServer(std::shared_ptr()); - // It needs more than 2 * kConnectivityCheckIntervalMsec time to reconnect - // the channel + // It needs more than kConnectivityCheckIntervalMsec time to reconnect the + // channel. gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(210, GPR_TIMESPAN))); + gpr_time_from_millis(510, GPR_TIMESPAN))); SendRpc(stub_.get(), 1, false); }