Fix refcounting

pull/2303/head
Craig Tiller 10 years ago
parent c396753a36
commit b6fbf1d986
  1. 1
      src/core/channel/client_channel.c
  2. 50
      src/core/client_config/subchannel.c

@ -402,7 +402,6 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op
grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE); grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE);
destroy_resolver = chand->resolver; destroy_resolver = chand->resolver;
chand->resolver = NULL; chand->resolver = NULL;
op->disconnect = 0;
} }
if (!is_empty(op, sizeof(*op))) { if (!is_empty(op, sizeof(*op))) {

@ -80,6 +80,8 @@ struct grpc_subchannel {
grpc_mdctx *mdctx; grpc_mdctx *mdctx;
/** master channel */ /** master channel */
grpc_channel *master; grpc_channel *master;
/** have we seen a disconnection? */
int disconnected;
/** set during connection */ /** set during connection */
grpc_connect_out_args connecting_result; grpc_connect_out_args connecting_result;
@ -152,6 +154,7 @@ static void subchannel_destroy(grpc_subchannel *c);
static void connection_destroy(connection *c) { static void connection_destroy(connection *c) {
GPR_ASSERT(c->refs == 0); GPR_ASSERT(c->refs == 0);
gpr_log(GPR_DEBUG, "CONNECTION_DESTROY %p", c);
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c); gpr_free(c);
} }
@ -342,8 +345,32 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
void grpc_subchannel_process_transport_op(grpc_subchannel *c, void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_transport_op *op) { grpc_transport_op *op) {
gpr_log(GPR_ERROR, "grpc_subchannel_process_transport_op not implemented"); connection *con = NULL;
abort(); grpc_subchannel *destroy;
gpr_mu_lock(&c->mu);
if (op->disconnect) {
c->disconnected = 1;
grpc_connectivity_state_set(&c->state_tracker,
compute_connectivity_locked(c));
}
if (c->active != NULL) {
con = c->active;
CONNECTION_REF_LOCKED(con, "transport-op");
}
gpr_mu_unlock(&c->mu);
if (con != NULL) {
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
top_elem->filter->start_transport_op(top_elem, op);
gpr_mu_lock(&c->mu);
destroy = CONNECTION_UNREF_LOCKED(con, "transport-op");
gpr_mu_unlock(&c->mu);
if (destroy) {
subchannel_destroy(destroy);
}
}
} }
static void on_state_changed(void *p, int iomgr_success) { static void on_state_changed(void *p, int iomgr_success) {
@ -388,7 +415,7 @@ static void on_state_changed(void *p, int iomgr_success) {
case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* things are starting to go wrong, reconnect but don't deactivate */ /* things are starting to go wrong, reconnect but don't deactivate */
/* released by connection */ /* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connection"); SUBCHANNEL_REF_LOCKED(c, "connecting");
do_connect = 1; do_connect = 1;
c->connecting = 1; c->connecting = 1;
break; break;
@ -397,7 +424,7 @@ static void on_state_changed(void *p, int iomgr_success) {
done: done:
grpc_connectivity_state_set(&c->state_tracker, grpc_connectivity_state_set(&c->state_tracker,
compute_connectivity_locked(c)); compute_connectivity_locked(c));
destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection"); destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
gpr_free(sw); gpr_free(sw);
gpr_mu_unlock(mu); gpr_mu_unlock(mu);
if (do_connect) { if (do_connect) {
@ -450,6 +477,14 @@ static void publish_transport(grpc_subchannel *c) {
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
if (c->disconnected) {
gpr_mu_unlock(&c->mu);
gpr_free(sw);
gpr_free(filters);
grpc_channel_stack_destroy(stk);
return;
}
/* publish */ /* publish */
if (c->active != NULL && c->active->refs == 0) { if (c->active != NULL && c->active->refs == 0) {
destroy_connection = c->active; destroy_connection = c->active;
@ -464,6 +499,8 @@ static void publish_transport(grpc_subchannel *c) {
memset(&op, 0, sizeof(op)); memset(&op, 0, sizeof(op));
op.connectivity_state = &sw->connectivity_state; op.connectivity_state = &sw->connectivity_state;
op.on_connectivity_state_change = &sw->closure; op.on_connectivity_state_change = &sw->closure;
SUBCHANNEL_REF_LOCKED(c, "state_watcher");
GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
elem = elem =
grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
elem->filter->start_transport_op(elem, &op); elem->filter->start_transport_op(elem, &op);
@ -491,7 +528,7 @@ static void subchannel_connected(void *arg, int iomgr_success) {
} else { } else {
int destroy; int destroy;
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection"); destroy = SUBCHANNEL_UNREF_LOCKED(c, "connecting");
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
if (destroy) subchannel_destroy(c); if (destroy) subchannel_destroy(c);
/* TODO(ctiller): retry after sleeping */ /* TODO(ctiller): retry after sleeping */
@ -504,6 +541,9 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
} }
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
if (c->disconnected) {
return GRPC_CHANNEL_FATAL_FAILURE;
}
if (c->connecting) { if (c->connecting) {
return GRPC_CHANNEL_CONNECTING; return GRPC_CHANNEL_CONNECTING;
} }

Loading…
Cancel
Save