From c396753a367677a4f2a111a850c2290dd27e2047 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 29 Jun 2015 14:59:38 -0700 Subject: [PATCH] Refcounting fixes --- src/core/channel/client_channel.c | 2 +- .../client_config/lb_policies/pick_first.c | 8 +- src/core/client_config/subchannel.c | 77 ++++++++++++------- src/core/client_config/subchannel.h | 24 +++++- 4 files changed, 76 insertions(+), 35 deletions(-) diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index ee0d2cd9bda..d3d2f42ec1c 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -459,7 +459,7 @@ static void destroy_call_elem(grpc_call_element *elem) { case CALL_ACTIVE: subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_unref(subchannel_call); + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel"); break; case CALL_CREATED: case CALL_CANCELLED: diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index c94408200ba..a8e5b5cc4a5 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -77,7 +77,7 @@ void pf_destroy(grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_unref(p->subchannels[i]); + GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first"); } gpr_free(p->subchannels); gpr_mu_destroy(&p->mu); @@ -180,7 +180,7 @@ loop: p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); p->num_subchannels--; - grpc_subchannel_unref(p->subchannels[p->num_subchannels]); + GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); add_interested_parties_locked(p); if (p->num_subchannels == 0) { abort(); @@ -206,13 +206,13 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { subchannels = gpr_malloc(n * sizeof(*subchannels)); for (i = 0; i < n; i++) { subchannels[i] = p->subchannels[i]; - grpc_subchannel_ref(subchannels[i]); + GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast"); } gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { grpc_subchannel_process_transport_op(subchannels[i], op); - grpc_subchannel_unref(subchannels[i]); + GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast"); } gpr_free(subchannels); } diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 6f4bf2ebe8b..3d065761ab9 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -123,13 +123,29 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(void *subchannel, int iomgr_success); -static void subchannel_ref_locked(grpc_subchannel *c); -static int subchannel_unref_locked(grpc_subchannel *c) GRPC_MUST_USE_RESULT; -static void connection_ref_locked(connection *c); -static grpc_subchannel *connection_unref_locked(connection *c) +static void subchannel_ref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +static int subchannel_unref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; +static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; static void subchannel_destroy(grpc_subchannel *c); +#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p), __FILE__, __LINE__, (r)) +#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p), __FILE__, __LINE__, (r)) +#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p), __FILE__, __LINE__, (r)) +#define CONNECTION_UNREF_LOCKED(p, r) connection_unref_locked((p), __FILE__, __LINE__, (r)) +#define REF_PASS_ARGS , file, line, reason +#define REF_LOG(name, p) gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", (name), (p), (p)->refs, (p)->refs + 1, reason) +#define UNREF_LOG(name, p) gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", (name), (p), (p)->refs, (p)->refs - 1, reason) +#else +#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p)) +#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p)) +#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p), __FILE__, __LINE__, (r)) +#define CONNECTION_UNREF_LOCKED(p, r) connection_unref_locked((p), __FILE__, __LINE__, (r)) +#define REF_PASS_ARGS +#endif + /* * connection implementation */ @@ -140,14 +156,16 @@ static void connection_destroy(connection *c) { gpr_free(c); } -static void connection_ref_locked(connection *c) { - subchannel_ref_locked(c->subchannel); +static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + REF_LOG("CONNECTION", c); + subchannel_ref_locked(c->subchannel REF_PASS_ARGS); ++c->refs; } -static grpc_subchannel *connection_unref_locked(connection *c) { +static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_subchannel *destroy = NULL; - if (subchannel_unref_locked(c->subchannel)) { + UNREF_LOG("CONNECTION", c); + if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) { destroy = c->subchannel; } if (--c->refs == 0 && c->subchannel->active != c) { @@ -160,22 +178,26 @@ static grpc_subchannel *connection_unref_locked(connection *c) { * grpc_subchannel implementation */ -static void subchannel_ref_locked(grpc_subchannel *c) { ++c->refs; } +static void subchannel_ref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + REF_LOG("SUBCHANNEL", c); + ++c->refs; +} -static int subchannel_unref_locked(grpc_subchannel *c) { +static int subchannel_unref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + UNREF_LOG("SUBCHANNEL", c); return --c->refs == 0; } -void grpc_subchannel_ref(grpc_subchannel *c) { +void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_mu_lock(&c->mu); - subchannel_ref_locked(c); + subchannel_ref_locked(c REF_PASS_ARGS); gpr_mu_unlock(&c->mu); } -void grpc_subchannel_unref(grpc_subchannel *c) { +void grpc_subchannel_unref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { int destroy; gpr_mu_lock(&c->mu); - destroy = subchannel_unref_locked(c); + destroy = subchannel_unref_locked(c REF_PASS_ARGS); gpr_mu_unlock(&c->mu); if (destroy) subchannel_destroy(c); } @@ -190,6 +212,7 @@ static void subchannel_destroy(grpc_subchannel *c) { grpc_mdctx_unref(c->mdctx); grpc_pollset_set_destroy(&c->pollset_set); grpc_connectivity_state_destroy(&c->state_tracker); + grpc_connector_unref(c->connector); gpr_free(c); } @@ -246,7 +269,7 @@ static void continue_creating_call(void *arg, int iomgr_success) { waiting_for_connect *w4c = arg; grpc_subchannel_create_call(w4c->subchannel, &w4c->initial_op, w4c->target, w4c->notify); - grpc_subchannel_unref(w4c->subchannel); + GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); } @@ -258,7 +281,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, gpr_mu_lock(&c->mu); if (c->active != NULL) { con = c->active; - connection_ref_locked(con); + CONNECTION_REF_LOCKED(con, "call"); gpr_mu_unlock(&c->mu); *target = create_call(con, initial_op); @@ -271,7 +294,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, w4c->target = target; w4c->subchannel = c; /* released when clearing w4c */ - subchannel_ref_locked(c); + SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect"); grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c); c->waiting = w4c; grpc_subchannel_add_interested_party(c, initial_op->bind_pollset); @@ -279,7 +302,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, c->connecting = 1; connectivity_state_changed_locked(c); /* released by connection */ - subchannel_ref_locked(c); + SUBCHANNEL_REF_LOCKED(c, "connecting"); gpr_mu_unlock(&c->mu); start_connect(c); @@ -307,7 +330,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, do_connect = 1; c->connecting = 1; /* released by connection */ - subchannel_ref_locked(c); + SUBCHANNEL_REF_LOCKED(c, "connecting"); grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c)); } @@ -365,7 +388,7 @@ static void on_state_changed(void *p, int iomgr_success) { case GRPC_CHANNEL_TRANSIENT_FAILURE: /* things are starting to go wrong, reconnect but don't deactivate */ /* released by connection */ - subchannel_ref_locked(c); + SUBCHANNEL_REF_LOCKED(c, "connection"); do_connect = 1; c->connecting = 1; break; @@ -374,7 +397,7 @@ static void on_state_changed(void *p, int iomgr_success) { done: grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c)); - destroy = subchannel_unref_locked(c); + destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection"); gpr_free(sw); gpr_mu_unlock(mu); if (do_connect) { @@ -468,7 +491,7 @@ static void subchannel_connected(void *arg, int iomgr_success) { } else { int destroy; gpr_mu_lock(&c->mu); - destroy = subchannel_unref_locked(c); + destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection"); gpr_mu_unlock(&c->mu); if (destroy) subchannel_destroy(c); /* TODO(ctiller): retry after sleeping */ @@ -499,18 +522,20 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) { * grpc_subchannel_call implementation */ -void grpc_subchannel_call_ref(grpc_subchannel_call *c) { gpr_ref(&c->refs); } +void grpc_subchannel_call_ref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_ref(&c->refs); +} -void grpc_subchannel_call_unref(grpc_subchannel_call *c) { +void grpc_subchannel_call_unref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { if (gpr_unref(&c->refs)) { gpr_mu *mu = &c->connection->subchannel->mu; grpc_subchannel *destroy; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c)); gpr_mu_lock(mu); - destroy = connection_unref_locked(c->connection); + destroy = CONNECTION_UNREF_LOCKED(c->connection, "call"); gpr_mu_unlock(mu); gpr_free(c); - if (destroy) { + if (destroy != NULL) { subchannel_destroy(destroy); } } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 766258846a4..97a64c488d7 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -43,10 +43,26 @@ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; -void grpc_subchannel_ref(grpc_subchannel *channel); -void grpc_subchannel_unref(grpc_subchannel *channel); -void grpc_subchannel_call_ref(grpc_subchannel_call *call); -void grpc_subchannel_call_unref(grpc_subchannel_call *call); +#define GRPC_SUBCHANNEL_REFCOUNT_DEBUG + +#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS , const char *file, int line, const char *reason +#else +#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) +#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p)) +#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) +#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p)) +#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS +#endif + +void grpc_subchannel_ref(grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_unref(grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_ref(grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_unref(grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a call (possibly asynchronously) */ void grpc_subchannel_create_call(grpc_subchannel *subchannel,