From b6fbf1d986331c8959e60a172fe922d9f864b03f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 29 Jun 2015 15:25:49 -0700 Subject: [PATCH] Fix refcounting --- src/core/channel/client_channel.c | 1 - src/core/client_config/subchannel.c | 50 ++++++++++++++++++++++++++--- 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index d3d2f42ec1c..6b60dc07cf7 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -402,7 +402,6 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE); destroy_resolver = chand->resolver; chand->resolver = NULL; - op->disconnect = 0; } if (!is_empty(op, sizeof(*op))) { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 3d065761ab9..eadeb0ef558 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -80,6 +80,8 @@ struct grpc_subchannel { grpc_mdctx *mdctx; /** master channel */ grpc_channel *master; + /** have we seen a disconnection? */ + int disconnected; /** set during connection */ grpc_connect_out_args connecting_result; @@ -152,6 +154,7 @@ static void subchannel_destroy(grpc_subchannel *c); static void connection_destroy(connection *c) { GPR_ASSERT(c->refs == 0); + gpr_log(GPR_DEBUG, "CONNECTION_DESTROY %p", c); grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); gpr_free(c); } @@ -342,8 +345,32 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) { - gpr_log(GPR_ERROR, "grpc_subchannel_process_transport_op not implemented"); - abort(); + connection *con = NULL; + grpc_subchannel *destroy; + gpr_mu_lock(&c->mu); + if (op->disconnect) { + c->disconnected = 1; + grpc_connectivity_state_set(&c->state_tracker, + compute_connectivity_locked(c)); + } + if (c->active != NULL) { + con = c->active; + CONNECTION_REF_LOCKED(con, "transport-op"); + } + gpr_mu_unlock(&c->mu); + + if (con != NULL) { + grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); + grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0); + top_elem->filter->start_transport_op(top_elem, op); + + gpr_mu_lock(&c->mu); + destroy = CONNECTION_UNREF_LOCKED(con, "transport-op"); + gpr_mu_unlock(&c->mu); + if (destroy) { + subchannel_destroy(destroy); + } + } } static void on_state_changed(void *p, int iomgr_success) { @@ -388,7 +415,7 @@ static void on_state_changed(void *p, int iomgr_success) { case GRPC_CHANNEL_TRANSIENT_FAILURE: /* things are starting to go wrong, reconnect but don't deactivate */ /* released by connection */ - SUBCHANNEL_REF_LOCKED(c, "connection"); + SUBCHANNEL_REF_LOCKED(c, "connecting"); do_connect = 1; c->connecting = 1; break; @@ -397,7 +424,7 @@ static void on_state_changed(void *p, int iomgr_success) { done: grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c)); - destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection"); + destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); gpr_free(sw); gpr_mu_unlock(mu); if (do_connect) { @@ -450,6 +477,14 @@ static void publish_transport(grpc_subchannel *c) { gpr_mu_lock(&c->mu); + if (c->disconnected) { + gpr_mu_unlock(&c->mu); + gpr_free(sw); + gpr_free(filters); + grpc_channel_stack_destroy(stk); + return; + } + /* publish */ if (c->active != NULL && c->active->refs == 0) { destroy_connection = c->active; @@ -464,6 +499,8 @@ static void publish_transport(grpc_subchannel *c) { memset(&op, 0, sizeof(op)); op.connectivity_state = &sw->connectivity_state; op.on_connectivity_state_change = &sw->closure; + SUBCHANNEL_REF_LOCKED(c, "state_watcher"); + GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting")); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); elem->filter->start_transport_op(elem, &op); @@ -491,7 +528,7 @@ static void subchannel_connected(void *arg, int iomgr_success) { } else { int destroy; gpr_mu_lock(&c->mu); - destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection"); + destroy = SUBCHANNEL_UNREF_LOCKED(c, "connecting"); gpr_mu_unlock(&c->mu); if (destroy) subchannel_destroy(c); /* TODO(ctiller): retry after sleeping */ @@ -504,6 +541,9 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { } static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { + if (c->disconnected) { + return GRPC_CHANNEL_FATAL_FAILURE; + } if (c->connecting) { return GRPC_CHANNEL_CONNECTING; }