Avoid using timers

pull/12294/head
Yuchen Zeng 8 years ago
parent 6a6d618034
commit a2e506e8d9
  1. 2
      include/grpc/grpc.h
  2. 23
      src/cpp/client/channel_cc.cc
  3. 1
      test/cpp/end2end/async_end2end_test.cc

@ -178,7 +178,7 @@ GRPCAPI void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state, grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag); gpr_timespec deadline, grpc_completion_queue *cq, void *tag);
/** Check whether a grpc channel support connectivity watcher */ /** Check whether a grpc channel supports connectivity watcher */
GRPCAPI int grpc_channel_support_connectivity_watcher(grpc_channel *channel); GRPCAPI int grpc_channel_support_connectivity_watcher(grpc_channel *channel);
/** Create a call given a grpc_channel, in order to call 'method'. All /** Create a call given a grpc_channel, in order to call 'method'. All

@ -41,7 +41,6 @@
namespace grpc { namespace grpc {
namespace { namespace {
const int kWaitForStateChangeTimeoutMsec = 100;
void WatchStateChange(void* arg); void WatchStateChange(void* arg);
} // namespace } // namespace
@ -51,32 +50,33 @@ void WatchStateChange(void* arg);
class ChannelConnectivityWatcher { class ChannelConnectivityWatcher {
public: public:
explicit ChannelConnectivityWatcher(Channel* channel) explicit ChannelConnectivityWatcher(Channel* channel)
: channel_(channel), thd_id_(0), being_destroyed_(0) {} : channel_(channel), thd_id_(0), shutting_down_(0) {}
void WatchStateChangeImpl() { void WatchStateChangeImpl() {
grpc_connectivity_state state = GRPC_CHANNEL_IDLE; grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
while (state != GRPC_CHANNEL_SHUTDOWN) { while (state != GRPC_CHANNEL_SHUTDOWN) {
if (gpr_atm_no_barrier_load(&being_destroyed_) == 1) { channel_->WaitForStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME));
if (gpr_atm_no_barrier_load(&shutting_down_) == 1) {
break; break;
} }
channel_->WaitForStateChange(
state,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(kWaitForStateChangeTimeoutMsec,
GPR_TIMESPAN)));
state = channel_->GetState(false); state = channel_->GetState(false);
} }
} }
void StartWatching() { void StartWatching() {
const char* disabled_str =
std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
if (disabled_str == nullptr || strcmp(disabled_str, "1")) {
gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options); gpr_thd_options_set_joinable(&options);
gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
} }
}
void Shutdown() { gpr_atm_no_barrier_store(&shutting_down_, 1); }
void Destroy() { void Destroy() {
if (thd_id_ != 0) { if (thd_id_ != 0) {
gpr_atm_no_barrier_store(&being_destroyed_, 1);
gpr_thd_join(thd_id_); gpr_thd_join(thd_id_);
} }
} }
@ -84,7 +84,7 @@ class ChannelConnectivityWatcher {
private: private:
Channel* channel_; Channel* channel_;
gpr_thd_id thd_id_; gpr_thd_id thd_id_;
gpr_atm being_destroyed_; gpr_atm shutting_down_;
}; };
namespace { namespace {
@ -107,8 +107,9 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel)
} }
Channel::~Channel() { Channel::~Channel() {
connectivity_watcher_->Destroy(); connectivity_watcher_->Shutdown();
grpc_channel_destroy(c_channel_); grpc_channel_destroy(c_channel_);
connectivity_watcher_->Destroy();
} }
namespace { namespace {

@ -272,6 +272,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
cq_->Shutdown(); cq_->Shutdown();
while (cq_->Next(&ignored_tag, &ignored_ok)) while (cq_->Next(&ignored_tag, &ignored_ok))
; ;
stub_.reset();
poll_overrider_.reset(); poll_overrider_.reset();
gpr_tls_set(&g_is_async_end2end_test, 0); gpr_tls_set(&g_is_async_end2end_test, 0);
grpc_recycle_unused_port(port_); grpc_recycle_unused_port(port_);

Loading…
Cancel
Save