From ff3ae687e1e85d4fb29024c20a17595dce05e51f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 29 Jun 2015 17:44:04 -0700 Subject: [PATCH] Add connect retry, backoff --- src/core/client_config/subchannel.c | 50 ++++++- src/core/client_config/subchannel.h | 2 + src/core/surface/server.c | 180 ++++++++++++------------ src/core/transport/chttp2/internal.h | 3 +- src/core/transport/chttp2_transport.c | 38 +++-- src/core/transport/connectivity_state.c | 17 ++- src/core/transport/connectivity_state.h | 4 + 7 files changed, 182 insertions(+), 112 deletions(-) diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 19ec1c0b4ba..c8c562f29d8 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -39,6 +39,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" +#include "src/core/iomgr/alarm.h" #include "src/core/transport/connectivity_state.h" typedef struct { @@ -108,6 +109,15 @@ struct grpc_subchannel { waiting_for_connect *waiting; /** connectivity state tracking */ grpc_connectivity_state_tracker state_tracker; + + /** next connect attempt time */ + gpr_timespec next_attempt; + /** amount to backoff each failure */ + gpr_timespec backoff_delta; + /** do we have an active alarm? */ + int have_alarm; + /** our alarm */ + grpc_alarm alarm; }; struct grpc_subchannel_call { @@ -259,7 +269,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, return c; } -static void start_connect(grpc_subchannel *c) { +static void continue_connect(grpc_subchannel *c) { grpc_connect_in_args args; args.interested_parties = &c->pollset_set; @@ -273,6 +283,14 @@ static void start_connect(grpc_subchannel *c) { &c->connected); } +static void start_connect(grpc_subchannel *c) { + gpr_timespec now = gpr_now(); + c->next_attempt = now; + c->backoff_delta = gpr_time_from_seconds(1); + + continue_connect(c); +} + static void continue_creating_call(void *arg, int iomgr_success) { waiting_for_connect *w4c = arg; grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target, @@ -350,10 +368,14 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) { connection *con = NULL; grpc_subchannel *destroy; + int cancel_alarm = 0; gpr_mu_lock(&c->mu); if (op->disconnect) { c->disconnected = 1; connectivity_state_changed_locked(c); + if (c->have_alarm) { + cancel_alarm = 1; + } } if (c->active != NULL) { con = c->active; @@ -373,6 +395,10 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, subchannel_destroy(destroy); } } + + if (cancel_alarm) { + grpc_alarm_cancel(&c->alarm); + } } static void on_state_changed(void *p, int iomgr_success) { @@ -528,18 +554,30 @@ static void publish_transport(grpc_subchannel *c) { } } +static void on_alarm(void *arg, int iomgr_success) { + grpc_subchannel *c = arg; + gpr_mu_lock(&c->mu); + c->have_alarm = 0; + gpr_mu_unlock(&c->mu); + if (iomgr_success) { + continue_connect(c); + } else { + GRPC_SUBCHANNEL_UNREF(c, "connecting"); + } +} + static void subchannel_connected(void *arg, int iomgr_success) { grpc_subchannel *c = arg; if (c->connecting_result.transport != NULL) { publish_transport(c); } else { - int destroy; gpr_mu_lock(&c->mu); - destroy = SUBCHANNEL_UNREF_LOCKED(c, "connecting"); + GPR_ASSERT(!c->have_alarm); + c->have_alarm = 1; + c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta); + c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta); + grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, gpr_now()); gpr_mu_unlock(&c->mu); - if (destroy) subchannel_destroy(c); - /* TODO(ctiller): retry after sleeping */ - abort(); } } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 5435ef703b3..03bd4f63e01 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -43,6 +43,8 @@ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; +#define GRPC_SUBCHANNEL_REFCOUNT_DEBUG + #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p), __FILE__, __LINE__, (r)) diff --git a/src/core/surface/server.c b/src/core/surface/server.c index f7d385c7afe..383c3d921dd 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -202,18 +202,86 @@ struct call_data { call_link links[CALL_LIST_COUNT]; }; +typedef struct { + grpc_channel **channels; + size_t num_channels; +} channel_broadcaster; + #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) static void begin_call(grpc_server *server, call_data *calld, requested_call *rc); static void fail_call(grpc_server *server, requested_call *rc); -static void shutdown_channel(channel_data *chand, int send_goaway, - int send_disconnect); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ static void maybe_finish_shutdown(grpc_server *server); +/* channel broadcaster */ + +/* assumes server locked */ +static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { + channel_data *c; + size_t count = 0; + for (c = s->root_channel_data.next; c != &s->root_channel_data; + c = c->next) { + count ++; + } + cb->num_channels = count; + cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels); + count = 0; + for (c = s->root_channel_data.next; c != &s->root_channel_data; + c = c->next) { + cb->channels[count] = c->channel; + GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast"); + count ++; + } +} + +struct shutdown_cleanup_args { + grpc_iomgr_closure closure; + gpr_slice slice; +}; + +static void shutdown_cleanup(void *arg, int iomgr_status_ignored) { + struct shutdown_cleanup_args *a = arg; + gpr_slice_unref(a->slice); + gpr_free(a); +} + +static void send_shutdown(grpc_channel *channel, int send_goaway, int send_disconnect) { + grpc_transport_op op; + struct shutdown_cleanup_args *sc; + grpc_channel_element *elem; + + memset(&op, 0, sizeof(op)); + gpr_log(GPR_DEBUG, "send_goaway:%d", send_goaway); + op.send_goaway = send_goaway; + sc = gpr_malloc(sizeof(*sc)); + sc->slice = gpr_slice_from_copied_string("Server shutdown"); + op.goaway_message = &sc->slice; + op.goaway_status = GRPC_STATUS_OK; + op.disconnect = send_disconnect; + grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc); + op.on_consumed = &sc->closure; + + elem = grpc_channel_stack_element( + grpc_channel_get_channel_stack(channel), 0); + elem->filter->start_transport_op(elem, &op); +} + +static void channel_broadcaster_shutdown(channel_broadcaster *cb, int send_goaway, int send_disconnect) { + size_t i; + + for (i = 0; i < cb->num_channels; i++) { + send_shutdown(cb->channels[i], send_goaway, send_disconnect); + GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); + } + gpr_free(cb->channels); +} + +/* call list */ + static int call_list_join(call_data **root, call_data *call, call_list list) { GPR_ASSERT(!call->root[list]); call->root[list] = root; @@ -458,12 +526,14 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static void decrement_call_count(channel_data *chand) { +static int decrement_call_count(channel_data *chand) { + int disconnect = 0; chand->num_calls--; if (0 == chand->num_calls && chand->server->shutdown) { - shutdown_channel(chand, 0, 1); + disconnect = 1; } maybe_finish_shutdown(chand->server); + return disconnect; } static void server_on_recv(void *ptr, int success) { @@ -471,6 +541,7 @@ static void server_on_recv(void *ptr, int success) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; int remove_res; + int disconnect = 0; if (success && !calld->got_initial_metadata) { size_t i; @@ -519,9 +590,16 @@ static void server_on_recv(void *ptr, int success) { gpr_mu_unlock(&chand->server->mu_call); gpr_mu_lock(&chand->server->mu_global); if (remove_res) { - decrement_call_count(chand); + disconnect = decrement_call_count(chand); + if (disconnect) { + GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect"); + } } gpr_mu_unlock(&chand->server->mu_global); + if (disconnect) { + send_shutdown(chand->channel, 0, 1); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect"); + } break; } @@ -575,89 +653,6 @@ static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { } } -#if 0 -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - channel_data *chand = elem->channel_data; - grpc_server *server = chand->server; - - switch (op->type) { - case GRPC_ACCEPT_CALL: - /* create a call */ - grpc_call_create(chand->channel, NULL, - op->data.accept_call.transport_server_data, NULL, 0, - gpr_inf_future); - break; - case GRPC_TRANSPORT_CLOSED: - /* if the transport is closed for a server channel, we destroy the - channel */ - gpr_mu_lock(&server->mu_global); - server_ref(server); - destroy_channel(chand); - gpr_mu_unlock(&server->mu_global); - server_unref(server); - break; - case GRPC_TRANSPORT_GOAWAY: - gpr_slice_unref(op->data.goaway.message); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_channel_next_op(elem, op); - break; - } -} -#endif - -typedef struct { - channel_data *chand; - int send_goaway; - int send_disconnect; - grpc_iomgr_closure finish_shutdown_channel_closure; - - /* for use during shutdown: the goaway message to send */ - gpr_slice goaway_message; -} shutdown_channel_args; - -static void destroy_shutdown_channel_args(void *p, int success) { - shutdown_channel_args *sca = p; - GRPC_CHANNEL_INTERNAL_UNREF(sca->chand->channel, "shutdown"); - gpr_slice_unref(sca->goaway_message); - gpr_free(sca); -} - -static void finish_shutdown_channel(void *p, int success) { - shutdown_channel_args *sca = p; - grpc_transport_op op; - memset(&op, 0, sizeof(op)); - - op.send_goaway = sca->send_goaway; - sca->goaway_message = gpr_slice_from_copied_string("Server shutdown"); - op.goaway_message = &sca->goaway_message; - op.goaway_status = GRPC_STATUS_OK; - op.disconnect = sca->send_disconnect; - grpc_iomgr_closure_init(&sca->finish_shutdown_channel_closure, - destroy_shutdown_channel_args, sca); - op.on_consumed = &sca->finish_shutdown_channel_closure; - - grpc_channel_next_op( - grpc_channel_stack_element( - grpc_channel_get_channel_stack(sca->chand->channel), 0), - &op); -} - -static void shutdown_channel(channel_data *chand, int send_goaway, - int send_disconnect) { - shutdown_channel_args *sca; - GRPC_CHANNEL_INTERNAL_REF(chand->channel, "shutdown"); - sca = gpr_malloc(sizeof(shutdown_channel_args)); - sca->chand = chand; - sca->send_goaway = send_goaway; - sca->send_disconnect = send_disconnect; - sca->finish_shutdown_channel_closure.cb = finish_shutdown_channel; - sca->finish_shutdown_channel_closure.cb_arg = sca; - grpc_iomgr_add_callback(&sca->finish_shutdown_channel_closure); -} - static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, grpc_transport_stream_op *initial_op) { @@ -969,10 +964,10 @@ void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; requested_call_array requested_calls; - channel_data *c; size_t i; registered_method *rm; shutdown_tag *sdt; + channel_broadcaster broadcaster; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); @@ -988,10 +983,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, return; } - for (c = server->root_channel_data.next; c != &server->root_channel_data; - c = c->next) { - shutdown_channel(c, 1, c->num_calls == 0); - } + channel_broadcaster_init(server, &broadcaster); /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); @@ -1029,6 +1021,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, for (l = server->listeners; l; l = l->next) { l->destroy(server, l->arg); } + + channel_broadcaster_shutdown(&broadcaster, 1, 0); } void grpc_server_listener_destroy_done(void *s) { diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 7e2e75f97d2..c8c46f0e544 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -160,7 +160,8 @@ typedef struct { /** data to write next write */ gpr_slice_buffer qbuf; /** queued callbacks */ - grpc_iomgr_closure *pending_closures; + grpc_iomgr_closure *pending_closures_head; + grpc_iomgr_closure *pending_closures_tail; /** window available for us to send to peer */ gpr_uint32 outgoing_window; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 8f909dff374..08a767f1d5b 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -117,6 +117,8 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t, static void maybe_start_some_streams( grpc_chttp2_transport_global *transport_global); +static void connectivity_state_set(grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state); + /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -328,7 +330,7 @@ static void destroy_transport(grpc_transport *gt) { static void close_transport_locked(grpc_chttp2_transport *t) { if (!t->closed) { t->closed = 1; - grpc_connectivity_state_set(&t->channel_callback.state_tracker, + connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE); if (t->ep) { grpc_endpoint_shutdown(t->ep); @@ -451,8 +453,9 @@ static void unlock(grpc_chttp2_transport *t) { grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); } - run_closures = t->global.pending_closures; - t->global.pending_closures = NULL; + run_closures = t->global.pending_closures_head; + t->global.pending_closures_head = NULL; + t->global.pending_closures_tail = NULL; gpr_mu_unlock(&t->mu); @@ -523,8 +526,8 @@ void grpc_chttp2_add_incoming_goaway( gpr_free(msg); gpr_slice_unref(goaway_text); transport_global->seen_goaway = 1; - grpc_connectivity_state_set( - &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, + connectivity_state_set( + transport_global, GRPC_CHANNEL_FATAL_FAILURE); } @@ -550,8 +553,7 @@ static void maybe_start_some_streams( transport_global->next_stream_id += 2; if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { - grpc_connectivity_state_set(&TRANSPORT_FROM_GLOBAL(transport_global) - ->channel_callback.state_tracker, + connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE); } @@ -933,12 +935,30 @@ static void reading_action(void *pt, int iomgr_success_ignored) { * CALLBACK LOOP */ +static void schedule_closure_for_connectivity(void *a, grpc_iomgr_closure *closure) { + grpc_chttp2_schedule_closure(a, closure, 1); +} + +static void connectivity_state_set(grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state) { + grpc_connectivity_state_set_with_scheduler( + &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, + state, + schedule_closure_for_connectivity, + transport_global); +} + void grpc_chttp2_schedule_closure( grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success) { closure->success = success; - closure->next = transport_global->pending_closures; - transport_global->pending_closures = closure; + if (transport_global->pending_closures_tail == NULL) { + transport_global->pending_closures_head = + transport_global->pending_closures_tail = closure; + } else { + transport_global->pending_closures_tail->next = closure; + transport_global->pending_closures_tail = closure; + } + closure->next = NULL; } /* diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 8df08af32f3..9a956a5a588 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -79,8 +79,10 @@ int grpc_connectivity_state_notify_on_state_change( return tracker->current_state == GRPC_CHANNEL_IDLE; } -void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, - grpc_connectivity_state state) { +void grpc_connectivity_state_set_with_scheduler( + grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, + void (*scheduler)(void *arg, grpc_iomgr_closure *closure), + void *arg) { grpc_connectivity_state_watcher *new = NULL; grpc_connectivity_state_watcher *w; /*gpr_log(GPR_DEBUG, "CS:%p:set:%d", tracker, state);*/ @@ -93,7 +95,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, if (state != *w->current) { *w->current = state; - grpc_iomgr_add_callback(w->notify); + scheduler(arg, w->notify); gpr_free(w); } else { w->next = new; @@ -102,3 +104,12 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, } tracker->watchers = new; } + +static void default_scheduler(void *ignored, grpc_iomgr_closure *closure) { + grpc_iomgr_add_callback(closure); +} + +void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state state) { + grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler, NULL); +} diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index 9a8c57525f0..c6f903a1ea6 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -59,6 +59,10 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state); +void grpc_connectivity_state_set_with_scheduler( + grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, + void (*scheduler)(void *arg, grpc_iomgr_closure *closure), + void *arg); grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker);