Share one monitoring thread between channels

pull/12294/head
Yuchen Zeng 8 years ago
parent bfb4e06e3c
commit b4481a9a13
  1. 3
      include/grpc++/channel.h
  2. 130
      src/cpp/client/channel_cc.cc
  3. 6
      test/cpp/end2end/end2end_test.cc

@ -30,8 +30,6 @@
struct grpc_channel;
namespace grpc {
class ChannelConnectivityWatcher;
/// Channels represent a connection to an endpoint. Created by \a CreateChannel.
class Channel final : public ChannelInterface,
public CallHook,
@ -73,7 +71,6 @@ class Channel final : public ChannelInterface,
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) override;
std::unique_ptr<ChannelConnectivityWatcher> connectivity_watcher_;
const grpc::string host_;
grpc_channel* const c_channel_; // owned
};

@ -43,8 +43,23 @@
namespace grpc {
namespace {
int kConnectivityCheckIntervalMsec = 100;
int kConnectivityCheckIntervalMsec = 500;
void WatchStateChange(void* arg);
class TagSaver final : public CompletionQueueTag {
public:
explicit TagSaver(void* tag) : tag_(tag) {}
~TagSaver() override {}
bool FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
delete this;
return true;
}
private:
void* tag_;
};
} // namespace
// Constantly watches channel connectivity status to reconnect a transiently
@ -52,55 +67,80 @@ void WatchStateChange(void* arg);
// support.
class ChannelConnectivityWatcher {
public:
explicit ChannelConnectivityWatcher(Channel* channel)
: channel_(channel), thd_id_(0) {}
ChannelConnectivityWatcher() {
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
}
~ChannelConnectivityWatcher() {
cq_.Shutdown();
if (thd_id_ != 0) {
gpr_thd_join(thd_id_);
}
}
void WatchStateChangeImpl() {
grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
bool ok = false;
void* tag = NULL;
while (state != GRPC_CHANNEL_SHUTDOWN) {
channel_->NotifyOnStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME),
&cq_, NULL);
while (cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)) ==
CompletionQueue::TIMEOUT) {
CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT;
while (status != CompletionQueue::SHUTDOWN) {
status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
// Make sure we've seen 2 TIMEOUTs before going to sleep
if (status == CompletionQueue::TIMEOUT) {
status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
}
if (status == CompletionQueue::TIMEOUT) {
gpr_sleep_until(
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(kConnectivityCheckIntervalMsec,
gpr_time_from_millis(kConnectivityCheckIntervalMsec,
GPR_TIMESPAN)));
} else if (status == CompletionQueue::GOT_EVENT) {
ChannelState* channel_state = static_cast<ChannelState*>(tag);
channel_state->state = grpc_channel_check_connectivity_state(
channel_state->channel, false);
if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
void* shutdown_tag = NULL;
channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
delete channel_state;
} else {
TagSaver* tag_saver = new TagSaver(channel_state);
grpc_channel_watch_connectivity_state(
channel_state->channel, channel_state->state,
gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
}
}
state = channel_->GetState(false);
}
}
void StartWatching() {
void StartWatching(grpc_channel* channel) {
const char* disabled_str =
std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
if (disabled_str == nullptr || strcmp(disabled_str, "1")) {
// This NotifyOnstateChange() is not used to monitor the channel state
// change, but to hold a reference of the c channel. So that
// WatchStateChangeImpl() can observe state == GRPC_CHANNEL_SHUTDOWN
// without holding any lock on the channel object.
channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE,
gpr_inf_future(GPR_CLOCK_REALTIME),
&shutdown_cq_, NULL);
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
ChannelState* channel_state = new ChannelState(channel);
// The first grpc_channel_watch_connectivity_state() is not used to
// monitor the channel state change, but to hold a reference of the
// c channel. So that WatchStateChangeImpl() can observe state ==
// GRPC_CHANNEL_SHUTDOWN without holding any lock on the channel object.
grpc_channel_watch_connectivity_state(
channel_state->channel, channel_state->state,
gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(),
new TagSaver(nullptr));
grpc_channel_watch_connectivity_state(
channel_state->channel, channel_state->state,
gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(),
new TagSaver(channel_state));
}
}
void Destroy() {
if (thd_id_ != 0) {
gpr_thd_join(thd_id_);
}
bool ok = false;
void* tag = NULL;
shutdown_cq_.Next(&tag, &ok);
}
private:
Channel* channel_;
struct ChannelState {
explicit ChannelState(grpc_channel* channel)
: channel(channel), state(GRPC_CHANNEL_IDLE){};
grpc_channel* channel;
grpc_connectivity_state state;
CompletionQueue shutdown_cq;
};
gpr_thd_id thd_id_;
CompletionQueue cq_;
CompletionQueue shutdown_cq_;
@ -112,22 +152,21 @@ void WatchStateChange(void* arg) {
static_cast<ChannelConnectivityWatcher*>(arg);
watcher->WatchStateChangeImpl();
}
ChannelConnectivityWatcher channel_connectivity_watcher;
} // namespace
static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
: connectivity_watcher_(new ChannelConnectivityWatcher(this)),
host_(host),
c_channel_(channel) {
: host_(host), c_channel_(channel) {
g_gli_initializer.summon();
if (grpc_channel_support_connectivity_watcher(channel)) {
connectivity_watcher_->StartWatching();
channel_connectivity_watcher.StartWatching(channel);
}
}
Channel::~Channel() {
grpc_channel_destroy(c_channel_);
connectivity_watcher_->Destroy();
}
namespace {
@ -213,23 +252,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
}
namespace {
class TagSaver final : public CompletionQueueTag {
public:
explicit TagSaver(void* tag) : tag_(tag) {}
~TagSaver() override {}
bool FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
delete this;
return true;
}
private:
void* tag_;
};
} // namespace
void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {

@ -704,10 +704,10 @@ TEST_P(End2endTest, ReconnectChannel) {
ResetStub();
SendRpc(stub_.get(), 1, false);
RestartServer(std::shared_ptr<AuthMetadataProcessor>());
// It needs more than 2 * kConnectivityCheckIntervalMsec time to reconnect
// the channel
// It needs more than kConnectivityCheckIntervalMsec time to reconnect the
// channel.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(210, GPR_TIMESPAN)));
gpr_time_from_millis(510, GPR_TIMESPAN)));
SendRpc(stub_.get(), 1, false);
}

Loading…
Cancel
Save