refcount debugging

pull/2303/head
Craig Tiller 10 years ago
parent c7b5f7605e
commit ca3e9d3e57
  1. 94
      src/core/channel/client_channel.c
  2. 3
      src/core/channel/connectivity_state.c
  3. 6
      src/core/client_config/lb_policies/pick_first.c
  4. 4
      src/core/client_config/lb_policy.c
  5. 2
      src/core/client_config/lb_policy.h
  6. 94
      src/core/client_config/subchannel.c
  7. 22
      src/core/client_config/subchannel.h
  8. 28
      src/core/surface/channel.c
  9. 2
      src/core/surface/channel.h

@ -38,6 +38,7 @@
#include "src/core/channel/channel_args.h" #include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h" #include "src/core/channel/connected_channel.h"
#include "src/core/channel/connectivity_state.h"
#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/pollset_set.h" #include "src/core/iomgr/pollset_set.h"
#include "src/core/support/string.h" #include "src/core/support/string.h"
@ -68,8 +69,7 @@ typedef struct {
/** resolver callback */ /** resolver callback */
grpc_iomgr_closure on_config_changed; grpc_iomgr_closure on_config_changed;
/** connectivity state being tracked */ /** connectivity state being tracked */
grpc_iomgr_closure *on_connectivity_state_change; grpc_connectivity_state_tracker state_tracker;
grpc_connectivity_state *connectivity_state;
} channel_data; } channel_data;
typedef enum { typedef enum {
@ -98,60 +98,6 @@ struct call_data {
grpc_linked_mdelem details; grpc_linked_mdelem details;
}; };
#if 0
static int prepare_activate(grpc_call_element *elem,
grpc_child_channel *on_child) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (calld->state == CALL_CANCELLED) return 0;
/* no more access to calld->s.waiting allowed */
GPR_ASSERT(calld->state == CALL_WAITING);
if (calld->waiting_op.bind_pollset) {
grpc_transport_setup_del_interested_party(chand->transport_setup,
calld->waiting_op.bind_pollset);
}
calld->state = CALL_ACTIVE;
/* create a child call */
/* TODO(ctiller): pass the waiting op down here */
calld->s.active.child_call =
grpc_child_channel_create_call(on_child, elem, NULL);
return 1;
}
static void complete_activate(grpc_call_element *elem, grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
GPR_ASSERT(calld->state == CALL_ACTIVE);
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
child_elem->filter->start_transport_op(child_elem, op);
}
static void remove_waiting_child(channel_data *chand, call_data *calld) {
size_t new_count;
size_t i;
for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
if (chand->waiting_children[i] == calld) {
grpc_transport_setup_del_interested_party(
chand->transport_setup, calld->waiting_op.bind_pollset);
continue;
}
chand->waiting_children[new_count++] = chand->waiting_children[i];
}
GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
new_count == chand->waiting_child_count);
chand->waiting_child_count = new_count;
}
#endif
static void handle_op_after_cancellation(grpc_call_element *elem, static void handle_op_after_cancellation(grpc_call_element *elem,
grpc_transport_stream_op *op) { grpc_transport_stream_op *op) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
@ -426,7 +372,39 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
} }
} }
static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {} static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_iomgr_closure *on_consumed = op->on_consumed;
op->on_consumed = NULL;
GPR_ASSERT(op->set_accept_stream == NULL);
GPR_ASSERT(op->bind_pollset == NULL);
gpr_mu_lock(&chand->mu_config);
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change);
op->on_connectivity_state_change = NULL;
op->connectivity_state = NULL;
}
if (!is_empty(op, sizeof(*op))) {
lb_policy = chand->lb_policy;
if (lb_policy) {
grpc_lb_policy_ref(lb_policy);
}
}
gpr_mu_unlock(&chand->mu_config);
if (lb_policy) {
grpc_lb_policy_broadcast(lb_policy, op);
grpc_lb_policy_unref(lb_policy);
}
if (on_consumed) {
grpc_iomgr_add_callback(on_consumed);
}
}
/* Constructor for call_data */ /* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem, static void init_call_elem(grpc_call_element *elem,
@ -458,7 +436,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
case CALL_ACTIVE: case CALL_ACTIVE:
subchannel_call = calld->subchannel_call; subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_unref(subchannel_call); GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "channel");
break; break;
case CALL_CREATED: case CALL_CREATED:
case CALL_CANCELLED: case CALL_CANCELLED:

@ -75,6 +75,9 @@ int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_track
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) { void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) {
grpc_connectivity_state_watcher *new = NULL; grpc_connectivity_state_watcher *new = NULL;
grpc_connectivity_state_watcher *w; grpc_connectivity_state_watcher *w;
if (tracker->current_state == state) {
return;
}
tracker->current_state = state; tracker->current_state = state;
while ((w = tracker->watchers)) { while ((w = tracker->watchers)) {
tracker->watchers = w->next; tracker->watchers = w->next;

@ -173,7 +173,7 @@ loop:
p->checking_subchannel %= p->num_subchannels; p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]); p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]);
p->num_subchannels--; p->num_subchannels--;
grpc_subchannel_unref(p->subchannels[p->num_subchannels]); GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
add_interested_parties_locked(p); add_interested_parties_locked(p);
if (p->num_subchannels == 0) { if (p->num_subchannels == 0) {
abort(); abort();
@ -199,13 +199,13 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
subchannels = gpr_malloc(n * sizeof(*subchannels)); subchannels = gpr_malloc(n * sizeof(*subchannels));
for (i = 0; i < n; i++) { for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i]; subchannels[i] = p->subchannels[i];
grpc_subchannel_ref(subchannels[i]); GRPC_SUBCHANNEL_REF(subchannels[i], "broadcast");
} }
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) { for (i = 0; i < n; i++) {
grpc_subchannel_process_transport_op(subchannels[i], op); grpc_subchannel_process_transport_op(subchannels[i], op);
grpc_subchannel_unref(subchannels[i]); GRPC_SUBCHANNEL_UNREF(subchannels[i], "broadcast");
} }
gpr_free(subchannels); gpr_free(subchannels);
} }

@ -49,3 +49,7 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_iomgr_closure *on_complete) { grpc_iomgr_closure *on_complete) {
policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete); policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete);
} }
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) {
policy->vtable->broadcast(policy, op);
}

@ -85,4 +85,6 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_subchannel **target, grpc_subchannel **target,
grpc_iomgr_closure *on_complete); grpc_iomgr_closure *on_complete);
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op);
#endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */ #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */

@ -105,14 +105,68 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(void *subchannel, int iomgr_success); static void subchannel_connected(void *subchannel, int iomgr_success);
/*
* connection implementation
*/
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define CONNECTION_REF(c, r) connection_ref((c), __FILE__, __LINE__, (r))
#define CONNECTION_UNREF(c, r) connection_unref((c), __FILE__, __LINE__, (r))
#else
#define CONNECTION_REF(c, r) connection_ref((c))
#define CONNECTION_UNREF(c, r) connection_unref((c))
#endif
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
static void connection_ref(connection *c, const char *file, int line, const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p ref %d -> %d %s",
c, (int)c->refs.count, (int)c->refs.count + 1,
reason);
#else
static void connection_ref(connection *c) {
#endif
gpr_ref(&c->refs);
}
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
static void connection_unref(connection *c, const char *file, int line, const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p unref %d -> %d %s",
c, (int)c->refs.count, (int)c->refs.count - 1,
reason);
#else
static void connection_unref(connection *c) {
#endif
if (gpr_unref(&c->refs)) {
GRPC_SUBCHANNEL_UNREF(c->subchannel, "connection");
gpr_free(c);
}
}
/* /*
* grpc_subchannel implementation * grpc_subchannel implementation
*/ */
void grpc_subchannel_ref(grpc_subchannel *c) { gpr_ref(&c->refs); } #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
void grpc_subchannel_ref(grpc_subchannel *c, const char *file, int line, const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p ref %d -> %d %s",
c, (int)c->refs.count, (int)c->refs.count + 1,
reason);
#else
void grpc_subchannel_ref(grpc_subchannel *c) {
#endif
gpr_ref(&c->refs);
}
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
void grpc_subchannel_unref(grpc_subchannel *c, const char *file, int line, const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p unref %d -> %d %s",
c, (int)c->refs.count, (int)c->refs.count - 1,
reason);
#else
void grpc_subchannel_unref(grpc_subchannel *c) { void grpc_subchannel_unref(grpc_subchannel *c) {
#endif
if (gpr_unref(&c->refs)) { if (gpr_unref(&c->refs)) {
if (c->active != NULL) CONNECTION_UNREF(c->active, "subchannel");
gpr_free(c->filters); gpr_free(c->filters);
grpc_channel_args_destroy(c->args); grpc_channel_args_destroy(c->args);
gpr_free(c->addr); gpr_free(c->addr);
@ -178,7 +232,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
if (c->active != NULL) { if (c->active != NULL) {
con = c->active; con = c->active;
gpr_ref(&con->refs); CONNECTION_REF(con, "call");
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
*target = create_call(con, initial_op); *target = create_call(con, initial_op);
@ -194,7 +248,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
if (!c->connecting) { if (!c->connecting) {
c->connecting = 1; c->connecting = 1;
connectivity_state_changed_locked(c); connectivity_state_changed_locked(c);
grpc_subchannel_ref(c); GRPC_SUBCHANNEL_REF(c, "connection");
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
start_connect(c); start_connect(c);
@ -220,7 +274,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) { if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) {
do_connect = 1; do_connect = 1;
c->connecting = 1; c->connecting = 1;
grpc_subchannel_ref(c); GRPC_SUBCHANNEL_REF(c, "connection");
grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c)); grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c));
} }
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
@ -275,7 +329,7 @@ static void subchannel_connected(void *arg, int iomgr_success) {
if (c->connecting_result.transport) { if (c->connecting_result.transport) {
publish_transport(c); publish_transport(c);
} else { } else {
grpc_subchannel_unref(c); GRPC_SUBCHANNEL_UNREF(c, "connection");
/* TODO(ctiller): retry after sleeping */ /* TODO(ctiller): retry after sleeping */
abort(); abort();
} }
@ -304,17 +358,29 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) {
* grpc_subchannel_call implementation * grpc_subchannel_call implementation
*/ */
void grpc_subchannel_call_ref(grpc_subchannel_call *call) { #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
gpr_ref(&call->refs); void grpc_subchannel_call_ref(grpc_subchannel_call *c, const char *file, int line, const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p ref %d -> %d %s",
c, (int)c->refs.count, (int)c->refs.count + 1,
reason);
#else
void grpc_subchannel_call_ref(grpc_subchannel_call *c) {
#endif
gpr_ref(&c->refs);
} }
void grpc_subchannel_call_unref(grpc_subchannel_call *call) { #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
if (gpr_unref(&call->refs)) { void grpc_subchannel_call_unref(grpc_subchannel_call *c, const char *file, int line, const char *reason) {
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call)); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p unref %d -> %d %s",
if (gpr_unref(&call->connection->refs)) { c, (int)c->refs.count, (int)c->refs.count - 1,
gpr_free(call->connection); reason);
} #else
gpr_free(call); void grpc_subchannel_call_unref(grpc_subchannel_call *c) {
#endif
if (gpr_unref(&c->refs)) {
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
CONNECTION_UNREF(c->connection, "call");
gpr_free(c);
} }
} }

@ -37,14 +37,33 @@
#include "src/core/channel/channel_stack.h" #include "src/core/channel/channel_stack.h"
#include "src/core/client_config/connector.h" #include "src/core/client_config/connector.h"
#define GRPC_SUBCHANNEL_REFCOUNT_DEBUG
/** A (sub-)channel that knows how to connect to exactly one target /** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */ address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args; typedef struct grpc_subchannel_args grpc_subchannel_args;
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define GRPC_SUBCHANNEL_REF(c, r) grpc_subchannel_ref((c), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(c, r) grpc_subchannel_unref((c), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(c, r) grpc_subchannel_call_ref((c), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(c, r) grpc_subchannel_call_unref((c), __FILE__, __LINE__, (r))
void grpc_subchannel_ref(grpc_subchannel *channel, const char *file, int line, const char *reason);
void grpc_subchannel_unref(grpc_subchannel *channel, const char *file, int line, const char *reason);
void grpc_subchannel_call_ref(grpc_subchannel_call *call, const char *file, int line, const char *reason);
void grpc_subchannel_call_unref(grpc_subchannel_call *call, const char *file, int line, const char *reason);
#else
#define GRPC_SUBCHANNEL_REF(c, r) grpc_subchannel_ref((c))
#define GRPC_SUBCHANNEL_UNREF(c, r) grpc_subchannel_unref((c))
#define GRPC_SUBCHANNEL_CALL_REF(c, r) grpc_subchannel_call_ref((c))
#define GRPC_SUBCHANNEL_CALL_UNREF(c, r) grpc_subchannel_call_unref((c))
void grpc_subchannel_ref(grpc_subchannel *channel); void grpc_subchannel_ref(grpc_subchannel *channel);
void grpc_subchannel_unref(grpc_subchannel *channel); void grpc_subchannel_unref(grpc_subchannel *channel);
void grpc_subchannel_call_ref(grpc_subchannel_call *call);
void grpc_subchannel_call_unref(grpc_subchannel_call *call);
#endif
/** construct a call (possibly asynchronously) */ /** construct a call (possibly asynchronously) */
void grpc_subchannel_create_call(grpc_subchannel *subchannel, void grpc_subchannel_create_call(grpc_subchannel *subchannel,
@ -55,9 +74,6 @@ void grpc_subchannel_create_call(grpc_subchannel *subchannel,
/** process a transport level op */ /** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, grpc_transport_op *op); void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, grpc_transport_op *op);
void grpc_subchannel_call_ref(grpc_subchannel_call *call);
void grpc_subchannel_call_unref(grpc_subchannel_call *call);
/** poll the current connectivity state of a channel */ /** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_connectivity_state grpc_subchannel_check_connectivity(
grpc_subchannel *channel); grpc_subchannel *channel);

@ -93,9 +93,8 @@ grpc_channel *grpc_channel_create_from_filters(
grpc_channel *channel = gpr_malloc(size); grpc_channel *channel = gpr_malloc(size);
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client; channel->is_client = is_client;
/* decremented by grpc_channel_destroy, and grpc_client_channel_closed if /* decremented by grpc_channel_destroy */
* is_client */ gpr_ref_init(&channel->refs, 1);
gpr_ref_init(&channel->refs, 1 + is_client);
channel->metadata_context = mdctx; channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_compression_level_string = channel->grpc_compression_level_string =
@ -237,33 +236,10 @@ void grpc_channel_internal_unref(grpc_channel *channel) {
} }
} }
static void default_consumed(void *arg, int iomgr_success) {
grpc_channel *channel = arg;
GRPC_CHANNEL_INTERNAL_UNREF(channel, "op");
}
static void execute_op(grpc_channel *channel, grpc_transport_op *op) {
grpc_channel_element *elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
if (op->on_consumed == NULL) {
GRPC_CHANNEL_INTERNAL_REF(channel, "op");
op->on_consumed = gpr_malloc(sizeof(*op->on_consumed));
grpc_iomgr_closure_init(op->on_consumed, default_consumed, channel);
}
elem->filter->start_transport_op(elem, op);
}
void grpc_channel_destroy(grpc_channel *channel) { void grpc_channel_destroy(grpc_channel *channel) {
grpc_transport_op op;
memset(&op, 0, sizeof(op));
op.disconnect = 1;
execute_op(channel, &op);
GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel"); GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel");
} }
void grpc_client_channel_closed(grpc_channel_element *elem) {
GRPC_CHANNEL_INTERNAL_UNREF(CHANNEL_FROM_TOP_ELEM(elem), "closed");
}
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) {
return CHANNEL_STACK_FROM_CHANNEL(channel); return CHANNEL_STACK_FROM_CHANNEL(channel);
} }

@ -58,8 +58,6 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
void grpc_client_channel_closed(grpc_channel_element *elem);
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
void grpc_channel_internal_unref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_channel *channel, const char *reason);

Loading…
Cancel
Save