fix a deadlock in cancel call

pull/1464/head
Yang Gao 10 years ago
parent ea13af73e0
commit ff30f8e8a9
  1. 33
      src/core/surface/call.c

@ -246,6 +246,10 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
static void execute_op(grpc_call *call, grpc_transport_op *op); static void execute_op(grpc_call *call, grpc_transport_op *op);
static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call); static void finish_read_ops(grpc_call *call);
static grpc_call_error grpc_call_cancel_with_status_internal(grpc_call *c,
grpc_status_code status,
const char *description,
gpr_uint8 locked);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data, const void *server_transport_data,
@ -627,7 +631,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
gpr_asprintf( gpr_asprintf(
&message, "Message terminated early; read %d bytes, expected %d", &message, "Message terminated early; read %d bytes, expected %d",
(int)call->incoming_message.length, (int)call->incoming_message_length); (int)call->incoming_message.length, (int)call->incoming_message_length);
grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); grpc_call_cancel_with_status_internal(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
gpr_free(message); gpr_free(message);
return 0; return 0;
} }
@ -638,7 +642,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
&message, &message,
"Maximum message length of %d exceeded by a message of length %d", "Maximum message length of %d exceeded by a message of length %d",
grpc_channel_get_max_message_length(call->channel), msg.length); grpc_channel_get_max_message_length(call->channel), msg.length);
grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); grpc_call_cancel_with_status_internal(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
gpr_free(message); gpr_free(message);
return 0; return 0;
} else if (msg.length > 0) { } else if (msg.length > 0) {
@ -658,9 +662,9 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
} }
/* we have to be reading a message to know what to do here */ /* we have to be reading a message to know what to do here */
if (!call->reading_message) { if (!call->reading_message) {
grpc_call_cancel_with_status( grpc_call_cancel_with_status_internal(
call, GRPC_STATUS_INVALID_ARGUMENT, call, GRPC_STATUS_INVALID_ARGUMENT,
"Received payload data while not reading a message"); "Received payload data while not reading a message", 1);
return 0; return 0;
} }
/* append the slice to the incoming buffer */ /* append the slice to the incoming buffer */
@ -671,7 +675,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
gpr_asprintf( gpr_asprintf(
&message, "Receiving message overflow; read %d bytes, expected %d", &message, "Receiving message overflow; read %d bytes, expected %d",
(int)call->incoming_message.length, (int)call->incoming_message_length); (int)call->incoming_message.length, (int)call->incoming_message_length);
grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); grpc_call_cancel_with_status_internal(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
gpr_free(message); gpr_free(message);
return 0; return 0;
} else if (call->incoming_message.length == call->incoming_message_length) { } else if (call->incoming_message.length == call->incoming_message_length) {
@ -996,6 +1000,13 @@ grpc_call_error grpc_call_cancel(grpc_call *call) {
grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status, grpc_status_code status,
const char *description) { const char *description) {
return grpc_call_cancel_with_status_internal(c, status, description, 0);
}
static grpc_call_error grpc_call_cancel_with_status_internal(grpc_call *c,
grpc_status_code status,
const char *description,
gpr_uint8 locked) {
grpc_transport_op op; grpc_transport_op op;
grpc_mdstr *details = grpc_mdstr *details =
description ? grpc_mdstr_from_string(c->metadata_context, description) description ? grpc_mdstr_from_string(c->metadata_context, description)
@ -1003,10 +1014,14 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
memset(&op, 0, sizeof(op)); memset(&op, 0, sizeof(op));
op.cancel_with_status = status; op.cancel_with_status = status;
lock(c); if (locked == 0) {
lock(c);
}
set_status_code(c, STATUS_FROM_API_OVERRIDE, status); set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
set_status_details(c, STATUS_FROM_API_OVERRIDE, details); set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
unlock(c); if (locked == 0) {
unlock(c);
}
execute_op(c, &op); execute_op(c, &op);
@ -1027,8 +1042,8 @@ static void call_alarm(void *arg, int success) {
grpc_call *call = arg; grpc_call *call = arg;
if (success) { if (success) {
if (call->is_client) { if (call->is_client) {
grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, grpc_call_cancel_with_status_internal(call, GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded"); "Deadline Exceeded", 0);
} else { } else {
grpc_call_cancel(call); grpc_call_cancel(call);
} }

Loading…
Cancel
Save