Most of the way to auto-cleanup subchannels

reviewable/pr4232/r5
Craig Tiller 9 years ago
parent 12ad5d6cd6
commit 50ec2670a4
  1. 3
      src/core/channel/channel_stack.c
  2. 1
      src/core/channel/channel_stack.h
  3. 9
      src/core/channel/client_channel.c
  4. 44
      src/core/client_config/lb_policies/pick_first.c
  5. 25
      src/core/client_config/lb_policies/round_robin.c
  6. 6
      src/core/client_config/lb_policy.c
  7. 7
      src/core/client_config/lb_policy.h
  8. 50
      src/core/client_config/subchannel.c
  9. 3
      src/core/client_config/subchannel.h
  10. 6
      src/core/iomgr/fd_posix.c
  11. 13
      src/core/iomgr/pollset_set_posix.c
  12. 2
      src/core/surface/channel.c
  13. 2
      src/core/transport/transport.h
  14. 2
      test/core/channel/channel_stack_test.c
  15. 12
      test/core/end2end/fixtures/h2_uchannel.c

@ -106,6 +106,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
const grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *channel_args,
const char *name,
grpc_channel_stack *stack) {
size_t call_size =
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
@ -117,7 +118,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
stack->count = filter_count;
GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg,
"CHANNEL_STACK");
name);
elems = CHANNEL_ELEMS_FROM_STACK(stack);
user_data =
((char *)elems) +

@ -183,6 +183,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
grpc_iomgr_cb_func destroy, void *destroy_arg,
const grpc_channel_filter **filters,
size_t filter_count, const grpc_channel_args *args,
const char *name,
grpc_channel_stack *stack);
/* Destroy a channel stack */
void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,

@ -260,10 +260,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
lb_policy = chand->lb_policy;
if (lb_policy) {
GRPC_LB_POLICY_REF(lb_policy, "broadcast");
}
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
@ -282,11 +278,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_resolver_shutdown(exec_ctx, destroy_resolver);
GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
}
if (lb_policy) {
grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
}
}
typedef struct {

@ -185,7 +185,6 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
pick_first_lb_policy *p = arg;
size_t i;
grpc_transport_op op;
size_t num_subchannels = p->num_subchannels;
grpc_subchannel **subchannels;
grpc_connected_subchannel *exclude_subchannel;
@ -199,12 +198,6 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) {
if (grpc_subchannel_get_connected_subchannel(subchannels[i]) !=
exclude_subchannel) {
memset(&op, 0, sizeof(op));
op.disconnect = 1;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
}
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
}
@ -323,41 +316,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&p->mu);
}
static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_transport_op *op) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
size_t n;
grpc_subchannel **subchannels;
grpc_connected_subchannel *selected;
gpr_mu_lock(&p->mu);
n = p->num_subchannels;
subchannels = gpr_malloc(n * sizeof(*subchannels));
selected = p->selected;
if (selected) {
GRPC_CONNECTED_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
}
for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i];
GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
}
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
if (selected != grpc_subchannel_get_connected_subchannel(subchannels[i])) {
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
}
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
}
if (p->selected) {
grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected,
"pf_broadcast_to_selected");
}
gpr_free(subchannels);
}
static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@ -380,7 +338,7 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle,
pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}

@ -451,29 +451,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
}
static void rr_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_transport_op *op) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
size_t i;
size_t n;
grpc_subchannel **subchannels;
gpr_mu_lock(&p->mu);
n = p->num_subchannels;
subchannels = gpr_malloc(n * sizeof(*subchannels));
for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i];
GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast");
}
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "rr_broadcast");
}
gpr_free(subchannels);
}
static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@ -497,7 +474,7 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx,
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle,
rr_broadcast, rr_check_connectivity, rr_notify_on_state_change};
rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}

@ -61,6 +61,7 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy,
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
#endif
if (gpr_unref(&policy->refs)) {
grpc_pollset_set_destroy(&policy->interested_parties);
policy->vtable->destroy(exec_ctx, policy);
}
}
@ -83,11 +84,6 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_transport_op *op) {
policy->vtable->broadcast(exec_ctx, policy, op);
}
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}

@ -66,10 +66,6 @@ struct grpc_lb_policy_vtable {
/** try to enter a READY connectivity state */
void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** broadcast a transport op to all subchannels */
void (*broadcast)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_transport_op *op);
/** check the current connectivity of the lb_policy */
grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy);
@ -118,9 +114,6 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_transport_op *op);
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,

@ -238,7 +238,7 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
if ((old_refs & STRONG_REF_MASK) == 0) {
if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
disconnect(exec_ctx, c);
}
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref");
@ -351,7 +351,7 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
do_connect = 1;
c->connecting = 1;
/* released by connection */
GRPC_SUBCHANNEL_REF(c, "connecting");
GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
}
gpr_mu_unlock(&c->mu);
@ -369,40 +369,6 @@ void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&c->mu);
}
void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c,
grpc_transport_op *op) {
grpc_connected_subchannel *con;
int cancel_alarm = 0;
gpr_mu_lock(&c->mu);
con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
GRPC_CONNECTED_SUBCHANNEL_REF(con, "transport-op");
}
if (op->disconnect) {
c->disconnected = 1;
grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
if (c->have_alarm) {
cancel_alarm = 1;
}
}
gpr_mu_unlock(&c->mu);
if (con != NULL) {
grpc_connected_subchannel_process_transport_op(exec_ctx, con, op);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "transport-op");
}
if (cancel_alarm) {
grpc_timer_cancel(exec_ctx, &c->alarm);
}
if (op->disconnect) {
grpc_connector_shutdown(exec_ctx, c->connector);
}
}
void grpc_connected_subchannel_process_transport_op(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_transport_op *op) {
@ -488,7 +454,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
con = gpr_malloc(channel_stack_size);
stk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters,
num_filters, c->args, stk);
num_filters, c->args, "CONNECTED_SUBCHANNEL", stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
gpr_free((void *)c->connecting_result.filters);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
@ -507,7 +473,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
gpr_free(sw_subchannel);
gpr_free((void *)filters);
grpc_channel_stack_destroy(exec_ctx, stk);
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
gpr_free(con);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
return;
}
@ -519,7 +486,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
for connecting is donated
to the state watcher */
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, con, &sw_subchannel->connectivity_state,
&sw_subchannel->closure);
@ -588,17 +555,18 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
update_reconnect_parameters(c);
continue_connect(exec_ctx, c);
} else {
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
}
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
grpc_subchannel *c = arg;
if (c->connecting_result.transport != NULL) {
publish_transport(exec_ctx, c);
} else if (c->disconnected) {
/* do nothing */
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(&c->mu);

@ -105,9 +105,6 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_pollset *pollset);
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel *subchannel,
grpc_transport_op *op);
void grpc_connected_subchannel_process_transport_op(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *subchannel,
grpc_transport_op *op);

@ -43,6 +43,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#define CLOSURE_NOT_READY ((grpc_closure *)0)
@ -158,7 +159,10 @@ void grpc_fd_global_shutdown(void) {
grpc_fd *grpc_fd_create(int fd, const char *name) {
grpc_fd *r = alloc_fd(fd);
grpc_iomgr_register_object(&r->iomgr_object, name);
char *name2;
gpr_asprintf(&name2, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&r->iomgr_object, name2);
gpr_free(name2);
#ifdef GRPC_FD_REF_COUNT_DEBUG
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
#endif

@ -52,7 +52,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
size_t i;
gpr_mu_destroy(&pollset_set->mu);
for (i = 0; i < pollset_set->fd_count; i++) {
GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
}
gpr_free(pollset_set->pollsets);
gpr_free(pollset_set->pollset_sets);
@ -74,7 +74,7 @@ void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
if (grpc_fd_is_orphaned(pollset_set->fds[i])) {
GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
} else {
grpc_pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
pollset_set->fds[j++] = pollset_set->fds[i];
@ -107,12 +107,13 @@ void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&bag->mu);
if (bag->pollset_set_count == bag->pollset_set_capacity) {
bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
bag->pollset_sets = gpr_realloc(bag->pollset_sets, bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
bag->pollset_sets = gpr_realloc(bag->pollset_sets,
bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
}
bag->pollset_sets[bag->pollset_set_count++] = item;
for (i = 0, j = 0; i < bag->fd_count; i++) {
if (grpc_fd_is_orphaned(bag->fds[i])) {
GRPC_FD_UNREF(bag->fds[i], "pollset");
GRPC_FD_UNREF(bag->fds[i], "pollset_set");
} else {
grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
bag->fds[j++] = bag->fds[i];
@ -130,7 +131,9 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
for (i = 0; i < bag->pollset_set_count; i++) {
if (bag->pollset_sets[i] == item) {
bag->pollset_set_count--;
GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], bag->pollset_sets[bag->pollset_set_count]);
GPR_SWAP(grpc_pollset_set *,
bag->pollset_sets[i],
bag->pollset_sets[bag->pollset_set_count]);
break;
}
}

@ -152,7 +152,7 @@ grpc_channel *grpc_channel_create_from_filters(
}
grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters,
num_filters, args,
num_filters, args, is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL",
CHANNEL_STACK_FROM_CHANNEL(channel));
return channel;

@ -50,8 +50,6 @@ typedef struct grpc_transport grpc_transport;
for a stream. */
typedef struct grpc_stream grpc_stream;
#define GRPC_STREAM_REFCOUNT_DEBUG
typedef struct grpc_stream_refcount {
gpr_refcount refs;
grpc_closure destroy;

@ -116,7 +116,7 @@ static void test_create_channel_stack(void) {
channel_stack = gpr_malloc(grpc_channel_stack_size(&filters, 1));
grpc_channel_stack_init(&exec_ctx, 1, free_channel, channel_stack, &filters,
1, &chan_args, channel_stack);
1, &chan_args, "test", channel_stack);
GPR_ASSERT(channel_stack->count == 1);
channel_elem = grpc_channel_stack_element(channel_stack, 0);
channel_data = (int *)channel_elem->channel_data;

@ -233,11 +233,12 @@ static grpc_end2end_test_fixture chttp2_create_fixture_micro_fullstack(
}
grpc_connectivity_state g_state = GRPC_CHANNEL_IDLE;
grpc_pollset_set g_interested_parties;
static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) {
if (g_state != GRPC_CHANNEL_READY) {
grpc_subchannel_notify_on_state_change(
exec_ctx, arg, &g_state, grpc_closure_create(state_changed, arg));
exec_ctx, arg, &g_interested_parties, &g_state, grpc_closure_create(state_changed, arg));
}
}
@ -247,12 +248,11 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, int success) {
static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) {
grpc_pollset pollset;
grpc_pollset_set interested_parties;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_init(&pollset);
grpc_pollset_set_add_pollset(&exec_ctx, &interested_parties, &pollset);
grpc_subchannel_add_interested_parties(&exec_ctx, c, &interested_parties);
grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state,
grpc_pollset_set_init(&g_interested_parties);
grpc_pollset_set_add_pollset(&exec_ctx, &g_interested_parties, &pollset);
grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, &g_state,
grpc_closure_create(state_changed, c));
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(&pollset));
@ -267,8 +267,8 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) {
}
grpc_pollset_shutdown(&exec_ctx, &pollset,
grpc_closure_create(destroy_pollset, &pollset));
grpc_pollset_set_destroy(&g_interested_parties);
gpr_mu_unlock(GRPC_POLLSET_MU(&pollset));
grpc_subchannel_del_interested_parties(&exec_ctx, c, &interested_parties);
grpc_exec_ctx_finish(&exec_ctx);
return grpc_subchannel_get_connected_subchannel(c);
}

Loading…
Cancel
Save