From be9b81424075fbd9bc77956c3eb882929b7bb5a8 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Fri, 4 Aug 2017 10:42:03 -0700 Subject: [PATCH] Add ChannelConnectivityWatcher --- include/grpc++/channel.h | 5 +++ src/cpp/client/channel_cc.cc | 64 ++++++++++++++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h index c50091d6ac1..73f28a182c7 100644 --- a/include/grpc++/channel.h +++ b/include/grpc++/channel.h @@ -29,6 +29,10 @@ struct grpc_channel; +namespace grpc { +class ChannelConnectivityWatcher; +} + namespace grpc { /// Channels represent a connection to an endpoint. Created by \a CreateChannel. class Channel final : public ChannelInterface, @@ -71,6 +75,7 @@ class Channel final : public ChannelInterface, bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) override; + std::unique_ptr connectivity_watcher_; const grpc::string host_; grpc_channel* const c_channel_; // owned }; diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index f2d9bb07c95..27bd75f3cde 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -35,17 +35,77 @@ #include #include #include +#include #include "src/core/lib/profiling/timers.h" namespace grpc { +namespace { +void WatchStateChange(void* arg); +} // namespace + +// Constantly watches channel connectivity status to reconnect a transiently +// disconnected channel. This is a temporary work-around before we have retry +// support. +class ChannelConnectivityWatcher { + public: + ChannelConnectivityWatcher(Channel* channel) + : channel_(channel), thd_id_(0), being_destroyed_(0) {} + + void WatchStateChangeImpl() { + grpc_connectivity_state state = GRPC_CHANNEL_IDLE; + while (state != GRPC_CHANNEL_SHUTDOWN) { + if (gpr_atm_no_barrier_load(&being_destroyed_) == 1) { + break; + } + channel_->WaitForStateChange( + state, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(1, GPR_TIMESPAN))); + state = channel_->GetState(false); + } + } + void StartWatching() { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } + + void Destroy() { + if (thd_id_ != 0) { + gpr_atm_no_barrier_store(&being_destroyed_, 1); + gpr_thd_join(thd_id_); + } + } + + private: + Channel* channel_; + gpr_thd_id thd_id_; + gpr_atm being_destroyed_; +}; + +namespace { +void WatchStateChange(void* arg) { + ChannelConnectivityWatcher* watcher = + static_cast(arg); + watcher->WatchStateChangeImpl(); +} +} // namespace + static internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel(const grpc::string& host, grpc_channel* channel) - : host_(host), c_channel_(channel) { + : connectivity_watcher_(new ChannelConnectivityWatcher(this)), + host_(host), + c_channel_(channel) { g_gli_initializer.summon(); + if (host != "inproc") { + connectivity_watcher_->StartWatching(); + } } -Channel::~Channel() { grpc_channel_destroy(c_channel_); } +Channel::~Channel() { + connectivity_watcher_->Destroy(); + grpc_channel_destroy(c_channel_); +} namespace {