From 296984b46c5e130c7aed4bdae1fc910108e3cbce Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 30 Nov 2016 12:25:02 -0800 Subject: [PATCH 1/2] Break infinite connection retry loop Previously, we'd keep retrying a connection until the channel closed. With this change, we only retry connecting *if there's a watcher on the subchannel connection state*. This ultimately means that if the lb_policy doesn't care if the subchannel connects, it'll stop trying. --- src/core/ext/client_channel/subchannel.c | 135 ++++++++++++-------- src/core/lib/transport/connectivity_state.c | 13 +- src/core/lib/transport/connectivity_state.h | 5 +- 3 files changed, 95 insertions(+), 58 deletions(-) diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index a148b2a0e1d..bb3d167becd 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -119,9 +119,9 @@ struct grpc_subchannel { gpr_mu mu; /** have we seen a disconnection? */ - int disconnected; + bool disconnected; /** are we connecting */ - int connecting; + bool connecting; /** connectivity state tracking */ grpc_connectivity_state_tracker state_tracker; @@ -132,7 +132,9 @@ struct grpc_subchannel { /** backoff state */ gpr_backoff backoff_state; /** do we have an active alarm? */ - int have_alarm; + bool have_alarm; + /** have we started the backoff loop */ + bool backoff_begun; /** our alarm */ grpc_timer alarm; }; @@ -264,7 +266,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_subchannel_index_unregister(exec_ctx, c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); - c->disconnected = 1; + c->disconnected = true; grpc_connector_shutdown(exec_ctx, c->connector); con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != NULL) { @@ -386,12 +388,6 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { &c->connected); } -static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { - c->next_attempt = - gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); - continue_connect(exec_ctx, c); -} - grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c, grpc_error **error) { grpc_connectivity_state state; @@ -418,6 +414,73 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, follow_up->cb(exec_ctx, follow_up->cb_arg, error); } +static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_subchannel *c = arg; + gpr_mu_lock(&c->mu); + c->have_alarm = 0; + if (c->disconnected) { + error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1); + } else { + GRPC_ERROR_REF(error); + } + if (error == GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); + c->next_attempt = + gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); + continue_connect(exec_ctx, c); + gpr_mu_unlock(&c->mu); + } else { + gpr_mu_unlock(&c->mu); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); + } + GRPC_ERROR_UNREF(error); +} + +static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c) { + if (c->disconnected) { + /* Don't try to connect if we're already disconnected */ + return; + } + + if (c->connecting) { + /* Already connecting: don't restart */ + return; + } + + if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != NULL) { + /* Already connected: don't restart */ + return; + } + + if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) { + /* Nobody is interested in connecting: so don't just yet */ + return; + } + + c->connecting = true; + GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + if (!c->backoff_begun) { + c->backoff_begun = true; + c->next_attempt = gpr_backoff_begin(&c->backoff_state, now); + continue_connect(exec_ctx, c); + } else { + GPR_ASSERT(!c->have_alarm); + c->have_alarm = 1; + gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); + if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <= + 0) { + gpr_log(GPR_INFO, "Retry immediately"); + } else { + gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", + time_til_next.tv_sec, time_til_next.tv_nsec); + } + grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); + } +} + void grpc_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, @@ -449,13 +512,9 @@ void grpc_subchannel_notify_on_state_change( w->next = &c->root_external_state_watcher; w->prev = w->next->prev; w->next->prev = w->prev->next = w; - if (grpc_connectivity_state_notify_on_state_change( - exec_ctx, &c->state_tracker, state, &w->closure)) { - c->connecting = 1; - /* released by connection */ - GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); - start_connect(exec_ctx, c); - } + grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, + state, &w->closure); + maybe_start_connecting_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } } @@ -575,7 +634,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, Re-evaluate if we really need this. */ gpr_atm_full_barrier(); GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); - c->connecting = 0; /* setup subchannel watching connected subchannel for changes; subchannel ref @@ -592,28 +650,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE, "connected"); } -static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_subchannel *c = arg; - gpr_mu_lock(&c->mu); - c->have_alarm = 0; - if (c->disconnected) { - error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1); - } else { - GRPC_ERROR_REF(error); - } - if (error == GRPC_ERROR_NONE) { - gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->next_attempt = - gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); - continue_connect(exec_ctx, c); - gpr_mu_unlock(&c->mu); - } else { - gpr_mu_unlock(&c->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); - } - GRPC_ERROR_UNREF(error); -} - static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_subchannel *c = arg; @@ -621,35 +657,28 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); gpr_mu_lock(&c->mu); + c->connecting = false; if (c->connecting_result.transport != NULL) { publish_transport_locked(exec_ctx, c); } else if (c->disconnected) { GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } else { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - GPR_ASSERT(!c->have_alarm); - c->have_alarm = 1; grpc_connectivity_state_set( exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_set_int( GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), "connect_failed"); - gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); + const char *errmsg = grpc_error_string(error); gpr_log(GPR_INFO, "Connect failed: %s", errmsg); - if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <= - 0) { - gpr_log(GPR_INFO, "Retry immediately"); - } else { - gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", - time_til_next.tv_sec, time_til_next.tv_nsec); - } - grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); grpc_error_free_string(errmsg); + + maybe_start_connecting_locked(exec_ctx, c); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } gpr_mu_unlock(&c->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected"); grpc_channel_args_destroy(delete_channel_args); } diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 89072879d94..4f49d7cf7d7 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -100,7 +100,12 @@ grpc_connectivity_state grpc_connectivity_state_check( return tracker->current_state; } -int grpc_connectivity_state_notify_on_state_change( +bool grpc_connectivity_state_has_watchers( + grpc_connectivity_state_tracker *connectivity_state) { + return connectivity_state->watchers != NULL; +} + +bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify) { if (grpc_connectivity_state_trace) { @@ -119,7 +124,7 @@ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); tracker->watchers = w->next; gpr_free(w); - return 0; + return false; } while (w != NULL) { grpc_connectivity_state_watcher *rm_candidate = w->next; @@ -127,11 +132,11 @@ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); w->next = w->next->next; gpr_free(rm_candidate); - return 0; + return false; } w = w->next; } - return 0; + return false; } else { if (tracker->current_state != *current) { *current = tracker->current_state; diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 7a2fa52c103..769c675b797 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -75,13 +75,16 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_error *associated_error, const char *reason); +bool grpc_connectivity_state_has_watchers( + grpc_connectivity_state_tracker *tracker); + grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker, grpc_error **current_error); /** Return 1 if the channel should start connecting, 0 otherwise. If current==NULL cancel notify if it is already queued (success==0 in that case) */ -int grpc_connectivity_state_notify_on_state_change( +bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify); From 9fbb3387d09e90670fba8e0fae01eff8f393acd9 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 30 Nov 2016 12:58:23 -0800 Subject: [PATCH 2/2] Review feedback --- src/core/ext/client_channel/subchannel.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index bb3d167becd..b83c6882dc0 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -372,7 +372,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, return grpc_subchannel_index_register(exec_ctx, key, c); } -static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { +static void continue_connect_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c) { grpc_connect_in_args args; args.interested_parties = c->pollset_set; @@ -417,7 +418,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); - c->have_alarm = 0; + c->have_alarm = false; if (c->disconnected) { error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1); } else { @@ -427,7 +428,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); c->next_attempt = gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); - continue_connect(exec_ctx, c); + continue_connect_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { gpr_mu_unlock(&c->mu); @@ -465,10 +466,10 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, if (!c->backoff_begun) { c->backoff_begun = true; c->next_attempt = gpr_backoff_begin(&c->backoff_state, now); - continue_connect(exec_ctx, c); + continue_connect_locked(exec_ctx, c); } else { GPR_ASSERT(!c->have_alarm); - c->have_alarm = 1; + c->have_alarm = true; gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <= 0) {