Handle reffing when a cancel or bind gets stuck in the waiting queue

pull/1888/head
Craig Tiller 10 years ago
parent f93fd05a97
commit 5dde66ecac
  1. 6
      src/core/channel/channel_stack.c
  2. 6
      src/core/channel/client_channel.c
  3. 75
      src/core/surface/call.c
  4. 3
      src/core/surface/lame_client.c
  5. 7
      src/core/transport/chttp2_transport.c
  6. 3
      src/core/transport/transport.c
  7. 3
      src/core/transport/transport.h

@ -211,9 +211,3 @@ void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
op.cancel_with_status = GRPC_STATUS_CANCELLED;
grpc_call_next_op(cur_elem, &op);
}
void grpc_call_element_recv_status(grpc_call_element *cur_elem,
grpc_status_code status,
const char *message) {
abort();
}

@ -171,6 +171,9 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
*op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv(op->recv_user_data, 1);
}
if (op->on_consumed) {
op->on_consumed(op->on_consumed_user_data, 0);
}
}
static void cc_start_transport_op(grpc_call_element *elem,
@ -264,6 +267,9 @@ static void cc_start_transport_op(grpc_call_element *elem,
calld->s.waiting_op.recv_user_data = op->recv_user_data;
}
gpr_mu_unlock(&chand->mu);
if (op->on_consumed) {
op->on_consumed(op->on_consumed_user_data, 0);
}
}
break;
case CALL_CANCELLED:

@ -156,6 +156,8 @@ struct grpc_call {
/* flags with bits corresponding to write states allowing us to determine
what was sent */
gpr_uint16 last_send_contains;
/* cancel with this status on the next outgoing transport op */
grpc_status_code cancel_with_status;
/* Active ioreqs.
request_set and request_data contain one element per active ioreq
@ -247,8 +249,7 @@ static void execute_op(grpc_call *call, grpc_transport_op *op);
static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
const char *description,
gpr_uint8 locked);
const char *description);
static void lock(grpc_call *call);
static void unlock(grpc_call *call);
@ -415,6 +416,7 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static int need_more_data(grpc_call *call) {
if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
/* TODO(ctiller): this needs some serious cleanup */
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
(is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
@ -423,7 +425,8 @@ static int need_more_data(grpc_call *call) {
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client);
(call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
(call->cancel_with_status != GRPC_STATUS_OK);
}
static void unlock(grpc_call *call) {
@ -435,11 +438,9 @@ static void unlock(grpc_call *call) {
memset(&op, 0, sizeof(op));
if (!call->bound_pollset && call->cq) {
call->bound_pollset = 1;
op.bind_pollset = grpc_cq_pollset(call->cq);
start_op = 1;
}
op.cancel_with_status = call->cancel_with_status;
start_op = op.cancel_with_status != GRPC_STATUS_OK;
call->cancel_with_status = GRPC_STATUS_OK; /* reset */
if (!call->receiving && need_more_data(call)) {
op.recv_ops = &call->recv_ops;
@ -459,6 +460,12 @@ static void unlock(grpc_call *call) {
}
}
if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) {
call->bound_pollset = 1;
op.bind_pollset = grpc_cq_pollset(call->cq);
start_op = 1;
}
if (!call->completing && call->num_completed_requests != 0) {
completing_requests = call->num_completed_requests;
memcpy(completed_requests, call->completed_requests,
@ -665,7 +672,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
gpr_asprintf(
&message, "Message terminated early; read %d bytes, expected %d",
(int)call->incoming_message.length, (int)call->incoming_message_length);
cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
gpr_free(message);
return 0;
}
@ -676,7 +683,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
&message,
"Maximum message length of %d exceeded by a message of length %d",
grpc_channel_get_max_message_length(call->channel), msg.length);
cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
gpr_free(message);
return 0;
} else if (msg.length > 0) {
@ -697,7 +704,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
/* we have to be reading a message to know what to do here */
if (!call->reading_message) {
cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT,
"Received payload data while not reading a message", 1);
"Received payload data while not reading a message");
return 0;
}
/* append the slice to the incoming buffer */
@ -708,7 +715,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
gpr_asprintf(
&message, "Receiving message overflow; read %d bytes, expected %d",
(int)call->incoming_message.length, (int)call->incoming_message_length);
cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
gpr_free(message);
return 0;
} else if (call->incoming_message.length == call->incoming_message_length) {
@ -1040,35 +1047,43 @@ grpc_call_error grpc_call_cancel(grpc_call *call) {
grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status,
const char *description) {
return cancel_with_status(c, status, description, 0);
grpc_call_error r;
lock(c);
r = cancel_with_status(c, status, description);
unlock(c);
return r;
}
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
const char *description,
gpr_uint8 locked) {
grpc_transport_op op;
const char *description) {
grpc_mdstr *details =
description ? grpc_mdstr_from_string(c->metadata_context, description)
: NULL;
memset(&op, 0, sizeof(op));
op.cancel_with_status = status;
if (locked == 0) {
lock(c);
}
GPR_ASSERT(status != GRPC_STATUS_OK);
set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
if (locked == 0) {
unlock(c);
}
execute_op(c, &op);
c->cancel_with_status = status;
return GRPC_CALL_OK;
}
static void finished_loose_op(void *call, int success_ignored) {
GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
}
static void execute_op(grpc_call *call, grpc_transport_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->on_consumed == NULL);
if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) {
GRPC_CALL_INTERNAL_REF(call, "loose-op");
op->on_consumed = finished_loose_op;
op->on_consumed_user_data = call;
}
elem = CALL_ELEM_FROM_CALL(call, 0);
op->context = call->context;
elem->filter->start_transport_op(elem, op);
@ -1081,12 +1096,10 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
static void call_alarm(void *arg, int success) {
grpc_call *call = arg;
if (success) {
if (call->is_client) {
cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded", 0);
} else {
grpc_call_cancel(call);
}
lock(call);
cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded");
unlock(call);
}
GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1);
}

@ -77,6 +77,9 @@ static void lame_start_transport_op(grpc_call_element *elem,
*op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv(op->recv_user_data, 1);
}
if (op->on_consumed) {
op->on_consumed(op->on_consumed_user_data, 0);
}
}
static void channel_op(grpc_channel_element *elem,

@ -1162,6 +1162,13 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
if (op->bind_pollset) {
add_to_pollset_locked(t, op->bind_pollset);
}
if (op->on_consumed) {
op_closure c;
c.cb = op->on_consumed;
c.user_data = op->on_consumed_user_data;
schedule_cb(t, c, 1);
}
}
static void perform_op(grpc_transport *gt, grpc_stream *gs,

@ -103,6 +103,9 @@ void grpc_transport_op_finish_with_failure(grpc_transport_op *op) {
if (op->recv_ops) {
op->on_done_recv(op->recv_user_data, 0);
}
if (op->on_consumed) {
op->on_consumed(op->on_consumed_user_data, 0);
}
}
void grpc_transport_op_add_cancellation(grpc_transport_op *op,

@ -64,6 +64,9 @@ typedef enum grpc_stream_state {
/* Transport op: a set of operations to perform on a transport */
typedef struct grpc_transport_op {
void (*on_consumed)(void *user_data, int success);
void *on_consumed_user_data;
grpc_stream_op_buffer *send_ops;
int is_last_send;
void (*on_done_send)(void *user_data, int success);

Loading…
Cancel
Save