Remove atm in ChannelConnectivityWatcher

pull/12294/head
Yuchen Zeng 8 years ago
parent a2e506e8d9
commit 4d88416c11
  1. 27
      src/cpp/client/channel_cc.cc

@ -50,15 +50,16 @@ void WatchStateChange(void* arg);
class ChannelConnectivityWatcher { class ChannelConnectivityWatcher {
public: public:
explicit ChannelConnectivityWatcher(Channel* channel) explicit ChannelConnectivityWatcher(Channel* channel)
: channel_(channel), thd_id_(0), shutting_down_(0) {} : channel_(channel), thd_id_(0) {}
void WatchStateChangeImpl() { void WatchStateChangeImpl() {
grpc_connectivity_state state = GRPC_CHANNEL_IDLE; grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
bool ok = false;
void* tag = NULL;
while (state != GRPC_CHANNEL_SHUTDOWN) { while (state != GRPC_CHANNEL_SHUTDOWN) {
channel_->WaitForStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME)); channel_->NotifyOnStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME),
if (gpr_atm_no_barrier_load(&shutting_down_) == 1) { &cq_, NULL);
break; cq_.Next(&tag, &ok);
}
state = channel_->GetState(false); state = channel_->GetState(false);
} }
} }
@ -67,24 +68,33 @@ class ChannelConnectivityWatcher {
const char* disabled_str = const char* disabled_str =
std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
if (disabled_str == nullptr || strcmp(disabled_str, "1")) { if (disabled_str == nullptr || strcmp(disabled_str, "1")) {
// This NotifyOnstateChange() is not used to monitor the channel state
// change, but to hold a reference of the c channel. So that
// WatchStateChangeImpl() can observe state == GRPC_CHANNEL_SHUTDOWN
// without holding any lock on the channel object.
channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE,
gpr_inf_future(GPR_CLOCK_REALTIME),
&shutdown_cq_, NULL);
gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options); gpr_thd_options_set_joinable(&options);
gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
} }
} }
void Shutdown() { gpr_atm_no_barrier_store(&shutting_down_, 1); }
void Destroy() { void Destroy() {
if (thd_id_ != 0) { if (thd_id_ != 0) {
gpr_thd_join(thd_id_); gpr_thd_join(thd_id_);
bool ok = false;
void* tag = NULL;
shutdown_cq_.Next(&tag, &ok);
} }
} }
private: private:
Channel* channel_; Channel* channel_;
gpr_thd_id thd_id_; gpr_thd_id thd_id_;
gpr_atm shutting_down_; CompletionQueue cq_;
CompletionQueue shutdown_cq_;
}; };
namespace { namespace {
@ -107,7 +117,6 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel)
} }
Channel::~Channel() { Channel::~Channel() {
connectivity_watcher_->Shutdown();
grpc_channel_destroy(c_channel_); grpc_channel_destroy(c_channel_);
connectivity_watcher_->Destroy(); connectivity_watcher_->Destroy();
} }

Loading…
Cancel
Save