Add ChannelConnectivityWatcher

pull/12294/head
Yuchen Zeng 8 years ago
parent ff0996d02b
commit be9b814240
  1. 5
      include/grpc++/channel.h
  2. 64
      src/cpp/client/channel_cc.cc

@ -29,6 +29,10 @@
struct grpc_channel;
namespace grpc {
class ChannelConnectivityWatcher;
}
namespace grpc {
/// Channels represent a connection to an endpoint. Created by \a CreateChannel.
class Channel final : public ChannelInterface,
@ -71,6 +75,7 @@ class Channel final : public ChannelInterface,
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) override;
std::unique_ptr<ChannelConnectivityWatcher> connectivity_watcher_;
const grpc::string host_;
grpc_channel* const c_channel_; // owned
};

@ -35,17 +35,77 @@
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include "src/core/lib/profiling/timers.h"
namespace grpc {
namespace {
void WatchStateChange(void* arg);
} // namespace
// Constantly watches channel connectivity status to reconnect a transiently
// disconnected channel. This is a temporary work-around before we have retry
// support.
class ChannelConnectivityWatcher {
public:
ChannelConnectivityWatcher(Channel* channel)
: channel_(channel), thd_id_(0), being_destroyed_(0) {}
void WatchStateChangeImpl() {
grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
while (state != GRPC_CHANNEL_SHUTDOWN) {
if (gpr_atm_no_barrier_load(&being_destroyed_) == 1) {
break;
}
channel_->WaitForStateChange(
state, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(1, GPR_TIMESPAN)));
state = channel_->GetState(false);
}
}
void StartWatching() {
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
}
void Destroy() {
if (thd_id_ != 0) {
gpr_atm_no_barrier_store(&being_destroyed_, 1);
gpr_thd_join(thd_id_);
}
}
private:
Channel* channel_;
gpr_thd_id thd_id_;
gpr_atm being_destroyed_;
};
namespace {
void WatchStateChange(void* arg) {
ChannelConnectivityWatcher* watcher =
static_cast<ChannelConnectivityWatcher*>(arg);
watcher->WatchStateChangeImpl();
}
} // namespace
static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
: host_(host), c_channel_(channel) {
: connectivity_watcher_(new ChannelConnectivityWatcher(this)),
host_(host),
c_channel_(channel) {
g_gli_initializer.summon();
if (host != "inproc") {
connectivity_watcher_->StartWatching();
}
}
Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Channel::~Channel() {
connectivity_watcher_->Destroy();
grpc_channel_destroy(c_channel_);
}
namespace {

Loading…
Cancel
Save