Add ability to continue waiting calls

pull/2303/head
Craig Tiller 10 years ago
parent 4ab82d2c4d
commit df91ba52d0
  1. 134
      src/core/client_config/subchannel.c

@ -49,11 +49,20 @@ typedef struct {
grpc_subchannel *subchannel;
} connection;
typedef struct {
grpc_iomgr_closure closure;
size_t version;
grpc_subchannel *subchannel;
grpc_connectivity_state connectivity_state;
} state_watcher;
typedef struct waiting_for_connect {
struct waiting_for_connect *next;
grpc_iomgr_closure *notify;
grpc_transport_stream_op *initial_op;
grpc_transport_stream_op initial_op;
grpc_subchannel_call **target;
grpc_subchannel *subchannel;
grpc_iomgr_closure continuation;
} waiting_for_connect;
struct grpc_subchannel {
@ -85,6 +94,8 @@ struct grpc_subchannel {
/** active connection */
connection *active;
/** version number for the active connection */
size_t active_version;
/** refcount */
int refs;
/** are we connecting */
@ -228,6 +239,16 @@ static void start_connect(grpc_subchannel *c) {
&c->connected);
}
static void continue_creating_call(void *arg, int iomgr_success) {
waiting_for_connect *w4c = arg;
grpc_subchannel_create_call(w4c->subchannel,
&w4c->initial_op,
w4c->target,
w4c->notify);
grpc_subchannel_unref(w4c->subchannel);
gpr_free(w4c);
}
void grpc_subchannel_create_call(grpc_subchannel *c,
grpc_transport_stream_op *initial_op,
grpc_subchannel_call **target,
@ -245,8 +266,11 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting;
w4c->notify = notify;
w4c->initial_op = initial_op;
w4c->initial_op = *initial_op;
w4c->target = target;
w4c->subchannel = c;
subchannel_ref_locked(c);
grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c);
c->waiting = w4c;
grpc_subchannel_add_interested_party(c, initial_op->bind_pollset);
if (!c->connecting) {
@ -291,7 +315,70 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_transport_op *op) {
abort();
abort(); /* not implemented */
}
static void on_state_changed(void *p, int iomgr_success) {
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
int destroy;
grpc_transport_op op;
grpc_channel_element *elem;
connection *destroy_connection = NULL;
int do_connect = 0;
gpr_mu_lock(mu);
/* if we failed or there is a version number mismatch, just leave
this closure */
if (!iomgr_success || sw->subchannel->active_version != sw->version) {
goto done;
}
switch (sw->connectivity_state) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY:
case GRPC_CHANNEL_IDLE:
/* all is still good: keep watching */
memset(&op, 0, sizeof(op));
op.connectivity_state = &sw->connectivity_state;
op.on_connectivity_state_change = &sw->closure;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
elem->filter->start_transport_op(elem, &op);
/* early out */
gpr_mu_unlock(mu);
return;
case GRPC_CHANNEL_FATAL_FAILURE:
/* things have gone wrong, deactivate and enter idle */
if (sw->subchannel->active->refs == 0) {
destroy_connection = sw->subchannel->active;
}
sw->subchannel->active = NULL;
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* things are starting to go wrong, reconnect but don't deactivate */
subchannel_ref_locked(c);
do_connect = 1;
c->connecting = 1;
break;
}
done:
grpc_connectivity_state_set(&c->state_tracker,
compute_connectivity_locked(c));
destroy = subchannel_unref_locked(c);
gpr_free(sw);
gpr_mu_unlock(mu);
if (do_connect) {
start_connect(c);
}
if (destroy) {
subchannel_destroy(c);
}
if (destroy_connection != NULL) {
connection_destroy(destroy_connection);
}
}
static void publish_transport(grpc_subchannel *c) {
@ -301,8 +388,12 @@ static void publish_transport(grpc_subchannel *c) {
size_t num_filters;
const grpc_channel_filter **filters;
waiting_for_connect *w4c;
int destroy;
grpc_transport_op op;
state_watcher *sw;
connection *destroy_connection = NULL;
grpc_channel_element *elem;
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
filters = gpr_malloc(sizeof(*filters) * num_filters);
memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
@ -310,31 +401,54 @@ static void publish_transport(grpc_subchannel *c) {
sizeof(*filters) * c->connecting_result.num_filters);
filters[num_filters - 1] = &grpc_connected_channel_filter;
/* construct channel stack */
channel_stack_size = grpc_channel_stack_size(filters, num_filters);
con = gpr_malloc(sizeof(connection) + channel_stack_size);
stk = (grpc_channel_stack *)(con + 1);
con->refs = 0;
con->subchannel = c;
grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */
sw = gpr_malloc(sizeof(*sw));
grpc_iomgr_closure_init(&sw->closure, on_state_changed, sw);
sw->subchannel = c;
sw->connectivity_state = GRPC_CHANNEL_READY;
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->active == NULL);
/* publish */
if (c->active != NULL && c->active->refs == 0) {
destroy_connection = c->active;
}
c->active = con;
c->active_version++;
sw->version = c->active_version;
c->connecting = 0;
/* watch for changes; subchannel ref for connecting is donated
to the state watcher */
memset(&op, 0, sizeof(op));
op.connectivity_state = &sw->connectivity_state;
op.on_connectivity_state_change = &sw->closure;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
elem->filter->start_transport_op(elem, &op);
/* signal completion */
connectivity_state_changed_locked(c);
while ((w4c = c->waiting)) {
abort(); /* not implemented */
c->waiting = w4c->next;
grpc_iomgr_add_callback(&w4c->continuation);
}
destroy = subchannel_unref_locked(c);
gpr_mu_unlock(&c->mu);
gpr_free(filters);
if (destroy) {
subchannel_destroy(c);
if (destroy_connection != NULL) {
connection_destroy(destroy_connection);
}
}

Loading…
Cancel
Save