Take a lock every time through client_channel - to simplify logic and ensure correctness

pull/1369/head
Craig Tiller 10 years ago
parent f87b609169
commit a940752dee
  1. 174
      src/core/channel/client_channel.c
  2. 8
      src/core/surface/call.c

@ -83,8 +83,6 @@ struct call_data {
/* owning element */ /* owning element */
grpc_call_element *elem; grpc_call_element *elem;
gpr_uint8 got_first_op;
call_state state; call_state state;
gpr_timespec deadline; gpr_timespec deadline;
union { union {
@ -129,55 +127,6 @@ static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
child_elem->filter->start_transport_op(child_elem, op); child_elem->filter->start_transport_op(child_elem, op);
} }
static void start_rpc(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
if (calld->state == CALL_CANCELLED) {
gpr_mu_unlock(&chand->mu);
grpc_transport_op_finish_with_failure(op);
return;
}
GPR_ASSERT(calld->state == CALL_CREATED);
calld->state = CALL_WAITING;
if (chand->active_child) {
/* channel is connected - use the connected stack */
if (prepare_activate(elem, chand->active_child)) {
gpr_mu_unlock(&chand->mu);
/* activate the request (pass it down) outside the lock */
complete_activate(elem, op);
} else {
gpr_mu_unlock(&chand->mu);
}
} else {
/* check to see if we should initiate a connection (if we're not already),
but don't do so until outside the lock to avoid re-entrancy problems if
the callback is immediate */
int initiate_transport_setup = 0;
if (!chand->transport_setup_initiated) {
chand->transport_setup_initiated = 1;
initiate_transport_setup = 1;
}
/* add this call to the waiting set to be resumed once we have a child
channel stack, growing the waiting set if needed */
if (chand->waiting_child_count == chand->waiting_child_capacity) {
chand->waiting_child_capacity =
GPR_MAX(chand->waiting_child_capacity * 2, 8);
chand->waiting_children =
gpr_realloc(chand->waiting_children,
chand->waiting_child_capacity * sizeof(call_data *));
}
calld->s.waiting_op = *op;
chand->waiting_children[chand->waiting_child_count++] = calld;
gpr_mu_unlock(&chand->mu);
/* finally initiate transport setup if needed */
if (initiate_transport_setup) {
grpc_transport_setup_initiate(chand->transport_setup);
}
}
}
static void remove_waiting_child(channel_data *chand, call_data *calld) { static void remove_waiting_child(channel_data *chand, call_data *calld) {
size_t new_count; size_t new_count;
size_t i; size_t i;
@ -217,11 +166,14 @@ static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport
} }
} }
static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) { static void cc_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
grpc_call_element *child_elem; grpc_call_element *child_elem;
grpc_transport_op waiting_op; grpc_transport_op waiting_op;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
gpr_mu_lock(&chand->mu); gpr_mu_lock(&chand->mu);
switch (calld->state) { switch (calld->state) {
@ -229,55 +181,82 @@ static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) {
child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu); gpr_mu_unlock(&chand->mu);
child_elem->filter->start_transport_op(child_elem, op); child_elem->filter->start_transport_op(child_elem, op);
return; /* early out */ break;
case CALL_WAITING:
waiting_op = calld->s.waiting_op;
remove_waiting_child(chand, calld);
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
handle_op_after_cancellation(elem, &waiting_op);
handle_op_after_cancellation(elem, op);
return; /* early out */
case CALL_CREATED: case CALL_CREATED:
calld->state = CALL_CANCELLED; if (op->cancel_with_status != GRPC_STATUS_OK) {
gpr_mu_unlock(&chand->mu); calld->state = CALL_CANCELLED;
handle_op_after_cancellation(elem, op); gpr_mu_unlock(&chand->mu);
return; /* early out */ handle_op_after_cancellation(elem, op);
} else {
calld->state = CALL_WAITING;
if (chand->active_child) {
/* channel is connected - use the connected stack */
if (prepare_activate(elem, chand->active_child)) {
gpr_mu_unlock(&chand->mu);
/* activate the request (pass it down) outside the lock */
complete_activate(elem, op);
} else {
gpr_mu_unlock(&chand->mu);
}
} else {
/* check to see if we should initiate a connection (if we're not already),
but don't do so until outside the lock to avoid re-entrancy problems if
the callback is immediate */
int initiate_transport_setup = 0;
if (!chand->transport_setup_initiated) {
chand->transport_setup_initiated = 1;
initiate_transport_setup = 1;
}
/* add this call to the waiting set to be resumed once we have a child
channel stack, growing the waiting set if needed */
if (chand->waiting_child_count == chand->waiting_child_capacity) {
chand->waiting_child_capacity =
GPR_MAX(chand->waiting_child_capacity * 2, 8);
chand->waiting_children =
gpr_realloc(chand->waiting_children,
chand->waiting_child_capacity * sizeof(call_data *));
}
calld->s.waiting_op = *op;
chand->waiting_children[chand->waiting_child_count++] = calld;
gpr_mu_unlock(&chand->mu);
/* finally initiate transport setup if needed */
if (initiate_transport_setup) {
grpc_transport_setup_initiate(chand->transport_setup);
}
}
}
break;
case CALL_WAITING:
if (op->cancel_with_status != GRPC_STATUS_OK) {
waiting_op = calld->s.waiting_op;
remove_waiting_child(chand, calld);
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
handle_op_after_cancellation(elem, &waiting_op);
handle_op_after_cancellation(elem, op);
} else {
GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != (op->send_ops == NULL));
GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != (op->recv_ops == NULL));
if (op->send_ops) {
calld->s.waiting_op.send_ops = op->send_ops;
calld->s.waiting_op.is_last_send = op->is_last_send;
calld->s.waiting_op.on_done_send = op->on_done_send;
calld->s.waiting_op.send_user_data = op->send_user_data;
}
if (op->recv_ops) {
calld->s.waiting_op.recv_ops = op->recv_ops;
calld->s.waiting_op.recv_state = op->recv_state;
calld->s.waiting_op.on_done_recv = op->on_done_recv;
calld->s.waiting_op.recv_user_data = op->recv_user_data;
}
gpr_mu_unlock(&chand->mu);
}
break;
case CALL_CANCELLED: case CALL_CANCELLED:
gpr_mu_unlock(&chand->mu); gpr_mu_unlock(&chand->mu);
handle_op_after_cancellation(elem, op); handle_op_after_cancellation(elem, op);
return; /* early out */ break;
}
gpr_log(GPR_ERROR, "should never reach here");
abort();
}
static void cc_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
call_data *calld = elem->call_data;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->cancel_with_status != GRPC_STATUS_OK) {
GPR_ASSERT(op->send_ops == NULL);
GPR_ASSERT(op->recv_ops == NULL);
cancel_rpc(elem, op);
return;
}
if (calld->state == CALL_CANCELLED) {
handle_op_after_cancellation(elem, op);
return;
}
if (!calld->got_first_op) {
calld->got_first_op = 1;
start_rpc(elem, op);
} else {
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
child_elem->filter->start_transport_op(child_elem, op);
} }
} }
@ -375,7 +354,6 @@ static void init_call_elem(grpc_call_element *elem,
calld->elem = elem; calld->elem = elem;
calld->state = CALL_CREATED; calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future; calld->deadline = gpr_inf_future;
calld->got_first_op = 0;
} }
/* Destructor for call_data */ /* Destructor for call_data */

@ -302,10 +302,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
call->receiving = 1; call->receiving = 1;
grpc_call_internal_ref(call); grpc_call_internal_ref(call);
initial_op_ptr = &initial_op; initial_op_ptr = &initial_op;
} else {
/* we clear this when we've sent initial metadata -- this is very much
a hack to avoid two ops ending up in client_channel */
call->receiving = 2;
} }
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call)); CALL_STACK_FROM_CALL(call));
@ -599,10 +595,6 @@ static void call_on_done_send(void *pc, int success) {
lock(call); lock(call);
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error); finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
if (call->is_client) {
GPR_ASSERT(call->receiving == 2);
call->receiving = 0;
}
} }
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error); finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error);

Loading…
Cancel
Save