diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 5b1406b4abf..b562167b5f7 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -178,7 +178,7 @@ GRPCAPI void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag); -/** Check whether a grpc channel support connectivity watcher */ +/** Check whether a grpc channel supports connectivity watcher */ GRPCAPI int grpc_channel_support_connectivity_watcher(grpc_channel *channel); /** Create a call given a grpc_channel, in order to call 'method'. All diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 7c52752bd9c..5b696baf819 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -41,7 +41,6 @@ namespace grpc { namespace { -const int kWaitForStateChangeTimeoutMsec = 100; void WatchStateChange(void* arg); } // namespace @@ -51,32 +50,33 @@ void WatchStateChange(void* arg); class ChannelConnectivityWatcher { public: explicit ChannelConnectivityWatcher(Channel* channel) - : channel_(channel), thd_id_(0), being_destroyed_(0) {} + : channel_(channel), thd_id_(0), shutting_down_(0) {} void WatchStateChangeImpl() { grpc_connectivity_state state = GRPC_CHANNEL_IDLE; while (state != GRPC_CHANNEL_SHUTDOWN) { - if (gpr_atm_no_barrier_load(&being_destroyed_) == 1) { + channel_->WaitForStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME)); + if (gpr_atm_no_barrier_load(&shutting_down_) == 1) { break; } - channel_->WaitForStateChange( - state, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(kWaitForStateChangeTimeoutMsec, - GPR_TIMESPAN))); state = channel_->GetState(false); } } void StartWatching() { - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + const char* disabled_str = + std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); + if (disabled_str == nullptr || strcmp(disabled_str, "1")) { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } } + void Shutdown() { gpr_atm_no_barrier_store(&shutting_down_, 1); } + void Destroy() { if (thd_id_ != 0) { - gpr_atm_no_barrier_store(&being_destroyed_, 1); gpr_thd_join(thd_id_); } } @@ -84,7 +84,7 @@ class ChannelConnectivityWatcher { private: Channel* channel_; gpr_thd_id thd_id_; - gpr_atm being_destroyed_; + gpr_atm shutting_down_; }; namespace { @@ -107,8 +107,9 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) } Channel::~Channel() { - connectivity_watcher_->Destroy(); + connectivity_watcher_->Shutdown(); grpc_channel_destroy(c_channel_); + connectivity_watcher_->Destroy(); } namespace { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index c1227a5a1c7..1d1e97a8204 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -272,6 +272,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam { cq_->Shutdown(); while (cq_->Next(&ignored_tag, &ignored_ok)) ; + stub_.reset(); poll_overrider_.reset(); gpr_tls_set(&g_is_async_end2end_test, 0); grpc_recycle_unused_port(port_);