Refcounting fixes

pull/2303/head
Craig Tiller 10 years ago
parent 9846503567
commit c396753a36
  1. 2
      src/core/channel/client_channel.c
  2. 8
      src/core/client_config/lb_policies/pick_first.c
  3. 77
      src/core/client_config/subchannel.c
  4. 24
      src/core/client_config/subchannel.h

@ -459,7 +459,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
case CALL_ACTIVE:
subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_unref(subchannel_call);
GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel");
break;
case CALL_CREATED:
case CALL_CANCELLED:

@ -77,7 +77,7 @@ void pf_destroy(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
for (i = 0; i < p->num_subchannels; i++) {
grpc_subchannel_unref(p->subchannels[i]);
GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first");
}
gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
@ -180,7 +180,7 @@ loop:
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
p->num_subchannels--;
grpc_subchannel_unref(p->subchannels[p->num_subchannels]);
GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
add_interested_parties_locked(p);
if (p->num_subchannels == 0) {
abort();
@ -206,13 +206,13 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
subchannels = gpr_malloc(n * sizeof(*subchannels));
for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i];
grpc_subchannel_ref(subchannels[i]);
GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
}
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
grpc_subchannel_process_transport_op(subchannels[i], op);
grpc_subchannel_unref(subchannels[i]);
GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast");
}
gpr_free(subchannels);
}

@ -123,13 +123,29 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(void *subchannel, int iomgr_success);
static void subchannel_ref_locked(grpc_subchannel *c);
static int subchannel_unref_locked(grpc_subchannel *c) GRPC_MUST_USE_RESULT;
static void connection_ref_locked(connection *c);
static grpc_subchannel *connection_unref_locked(connection *c)
static void subchannel_ref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
static int subchannel_unref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
GRPC_MUST_USE_RESULT;
static void subchannel_destroy(grpc_subchannel *c);
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p), __FILE__, __LINE__, (r))
#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_UNREF_LOCKED(p, r) connection_unref_locked((p), __FILE__, __LINE__, (r))
#define REF_PASS_ARGS , file, line, reason
#define REF_LOG(name, p) gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", (name), (p), (p)->refs, (p)->refs + 1, reason)
#define UNREF_LOG(name, p) gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", (name), (p), (p)->refs, (p)->refs - 1, reason)
#else
#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_UNREF_LOCKED(p, r) connection_unref_locked((p), __FILE__, __LINE__, (r))
#define REF_PASS_ARGS
#endif
/*
* connection implementation
*/
@ -140,14 +156,16 @@ static void connection_destroy(connection *c) {
gpr_free(c);
}
static void connection_ref_locked(connection *c) {
subchannel_ref_locked(c->subchannel);
static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
REF_LOG("CONNECTION", c);
subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
++c->refs;
}
static grpc_subchannel *connection_unref_locked(connection *c) {
static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
grpc_subchannel *destroy = NULL;
if (subchannel_unref_locked(c->subchannel)) {
UNREF_LOG("CONNECTION", c);
if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
destroy = c->subchannel;
}
if (--c->refs == 0 && c->subchannel->active != c) {
@ -160,22 +178,26 @@ static grpc_subchannel *connection_unref_locked(connection *c) {
* grpc_subchannel implementation
*/
static void subchannel_ref_locked(grpc_subchannel *c) { ++c->refs; }
static void subchannel_ref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
REF_LOG("SUBCHANNEL", c);
++c->refs;
}
static int subchannel_unref_locked(grpc_subchannel *c) {
static int subchannel_unref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
UNREF_LOG("SUBCHANNEL", c);
return --c->refs == 0;
}
void grpc_subchannel_ref(grpc_subchannel *c) {
void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_mu_lock(&c->mu);
subchannel_ref_locked(c);
subchannel_ref_locked(c REF_PASS_ARGS);
gpr_mu_unlock(&c->mu);
}
void grpc_subchannel_unref(grpc_subchannel *c) {
void grpc_subchannel_unref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
int destroy;
gpr_mu_lock(&c->mu);
destroy = subchannel_unref_locked(c);
destroy = subchannel_unref_locked(c REF_PASS_ARGS);
gpr_mu_unlock(&c->mu);
if (destroy) subchannel_destroy(c);
}
@ -190,6 +212,7 @@ static void subchannel_destroy(grpc_subchannel *c) {
grpc_mdctx_unref(c->mdctx);
grpc_pollset_set_destroy(&c->pollset_set);
grpc_connectivity_state_destroy(&c->state_tracker);
grpc_connector_unref(c->connector);
gpr_free(c);
}
@ -246,7 +269,7 @@ static void continue_creating_call(void *arg, int iomgr_success) {
waiting_for_connect *w4c = arg;
grpc_subchannel_create_call(w4c->subchannel, &w4c->initial_op, w4c->target,
w4c->notify);
grpc_subchannel_unref(w4c->subchannel);
GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
}
@ -258,7 +281,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
con = c->active;
connection_ref_locked(con);
CONNECTION_REF_LOCKED(con, "call");
gpr_mu_unlock(&c->mu);
*target = create_call(con, initial_op);
@ -271,7 +294,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
w4c->target = target;
w4c->subchannel = c;
/* released when clearing w4c */
subchannel_ref_locked(c);
SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c);
c->waiting = w4c;
grpc_subchannel_add_interested_party(c, initial_op->bind_pollset);
@ -279,7 +302,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
c->connecting = 1;
connectivity_state_changed_locked(c);
/* released by connection */
subchannel_ref_locked(c);
SUBCHANNEL_REF_LOCKED(c, "connecting");
gpr_mu_unlock(&c->mu);
start_connect(c);
@ -307,7 +330,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
do_connect = 1;
c->connecting = 1;
/* released by connection */
subchannel_ref_locked(c);
SUBCHANNEL_REF_LOCKED(c, "connecting");
grpc_connectivity_state_set(&c->state_tracker,
compute_connectivity_locked(c));
}
@ -365,7 +388,7 @@ static void on_state_changed(void *p, int iomgr_success) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* things are starting to go wrong, reconnect but don't deactivate */
/* released by connection */
subchannel_ref_locked(c);
SUBCHANNEL_REF_LOCKED(c, "connection");
do_connect = 1;
c->connecting = 1;
break;
@ -374,7 +397,7 @@ static void on_state_changed(void *p, int iomgr_success) {
done:
grpc_connectivity_state_set(&c->state_tracker,
compute_connectivity_locked(c));
destroy = subchannel_unref_locked(c);
destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection");
gpr_free(sw);
gpr_mu_unlock(mu);
if (do_connect) {
@ -468,7 +491,7 @@ static void subchannel_connected(void *arg, int iomgr_success) {
} else {
int destroy;
gpr_mu_lock(&c->mu);
destroy = subchannel_unref_locked(c);
destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection");
gpr_mu_unlock(&c->mu);
if (destroy) subchannel_destroy(c);
/* TODO(ctiller): retry after sleeping */
@ -499,18 +522,20 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) {
* grpc_subchannel_call implementation
*/
void grpc_subchannel_call_ref(grpc_subchannel_call *c) { gpr_ref(&c->refs); }
void grpc_subchannel_call_ref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_ref(&c->refs);
}
void grpc_subchannel_call_unref(grpc_subchannel_call *c) {
void grpc_subchannel_call_unref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
if (gpr_unref(&c->refs)) {
gpr_mu *mu = &c->connection->subchannel->mu;
grpc_subchannel *destroy;
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
gpr_mu_lock(mu);
destroy = connection_unref_locked(c->connection);
destroy = CONNECTION_UNREF_LOCKED(c->connection, "call");
gpr_mu_unlock(mu);
gpr_free(c);
if (destroy) {
if (destroy != NULL) {
subchannel_destroy(destroy);
}
}

@ -43,10 +43,26 @@ typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
void grpc_subchannel_ref(grpc_subchannel *channel);
void grpc_subchannel_unref(grpc_subchannel *channel);
void grpc_subchannel_call_ref(grpc_subchannel_call *call);
void grpc_subchannel_call_unref(grpc_subchannel_call *call);
#define GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS , const char *file, int line, const char *reason
#else
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p))
#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#endif
void grpc_subchannel_ref(grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_unref(grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref(grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
/** construct a call (possibly asynchronously) */
void grpc_subchannel_create_call(grpc_subchannel *subchannel,

Loading…
Cancel
Save