Add ChannelConnectivityWatcher::Ref()/Unref()

pull/12294/head
Yuchen Zeng 7 years ago
parent 6514a0df72
commit 4a11ecc076
  1. 2
      src/core/lib/iomgr/iomgr.c
  2. 32
      src/cpp/client/channel_cc.cc
  3. 4
      test/cpp/end2end/async_end2end_test.cc

@ -165,6 +165,6 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
bool grpc_iomgr_abort_on_leaks(void) {
char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS");
bool should_we = gpr_is_true(env);
grp_free(env);
gpr_free(env);
return should_we;
}

@ -71,16 +71,7 @@ class ChannelConnectivityWatcher {
public:
static void StartWatching(grpc_channel* channel) {
char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
bool disabled = false;
if (env != nullptr) {
static const char* truthy[] = {"yes", "true", "1"};
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == gpr_stricmp(env, truthy[i])) {
disabled = true;
break;
}
}
}
bool disabled = gpr_is_true(env);
gpr_free(env);
if (!disabled) {
gpr_once_init(&g_connectivity_watcher_once_, InitConnectivityWatcherOnce);
@ -125,11 +116,7 @@ class ChannelConnectivityWatcher {
void* shutdown_tag = NULL;
channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
delete channel_state;
if (gpr_unref(&ref_)) {
gpr_mu_lock(&g_watcher_mu_);
delete g_watcher_;
g_watcher_ = nullptr;
gpr_mu_unlock(&g_watcher_mu_);
if (Unref()) {
break;
}
} else {
@ -143,7 +130,7 @@ class ChannelConnectivityWatcher {
void StartWatchingLocked(grpc_channel* channel) {
if (thd_id_ != 0) {
gpr_ref(&ref_);
Ref();
ChannelState* channel_state = new ChannelState(channel);
// The first grpc_channel_watch_connectivity_state() is not used to
// monitor the channel state change, but to hold a reference of the
@ -160,6 +147,19 @@ class ChannelConnectivityWatcher {
}
}
void Ref() { gpr_ref(&ref_); }
bool Unref() {
if (gpr_unref(&ref_)) {
gpr_mu_lock(&g_watcher_mu_);
delete g_watcher_;
g_watcher_ = nullptr;
gpr_mu_unlock(&g_watcher_mu_);
return true;
}
return false;
}
static void InitOnce() { gpr_mu_init(&g_watcher_mu_); }
friend void WatchStateChange(void* arg);

@ -378,6 +378,10 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) {
while (cq_->Next(&ignored_tag, &ignored_ok))
;
BuildAndStartServer();
// It needs more than kConnectivityCheckIntervalMsec time to reconnect the
// channel.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(510, GPR_TIMESPAN)));
SendRpc(1);
}

Loading…
Cancel
Save