Ensure we dont destroy a call until it is closed

pull/2149/head
Craig Tiller 10 years ago
parent a603a450a9
commit f3fba749aa
  1. 12
      src/core/surface/call.c

@ -148,6 +148,8 @@ struct grpc_call {
gpr_uint8 receiving; gpr_uint8 receiving;
/* are we currently completing requests */ /* are we currently completing requests */
gpr_uint8 completing; gpr_uint8 completing;
/** has grpc_call_destroy been called */
gpr_uint8 destroy_called;
/* pairs with completed_requests */ /* pairs with completed_requests */
gpr_uint8 num_completed_requests; gpr_uint8 num_completed_requests;
/* are we currently reading a message? */ /* are we currently reading a message? */
@ -294,8 +296,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
grpc_sopb_init(&call->send_ops); grpc_sopb_init(&call->send_ops);
grpc_sopb_init(&call->recv_ops); grpc_sopb_init(&call->recv_ops);
gpr_slice_buffer_init(&call->incoming_message); gpr_slice_buffer_init(&call->incoming_message);
/* dropped in destroy */ /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
gpr_ref_init(&call->internal_refcount, 1); gpr_ref_init(&call->internal_refcount, 2);
/* server hack: start reads immediately so we can get initial metadata. /* server hack: start reads immediately so we can get initial metadata.
TODO(ctiller): figure out a cleaner solution */ TODO(ctiller): figure out a cleaner solution */
if (!call->is_client) { if (!call->is_client) {
@ -436,7 +438,8 @@ static int need_more_data(grpc_call *call) {
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) || grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client) || (call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
(call->cancel_with_status != GRPC_STATUS_OK); (call->cancel_with_status != GRPC_STATUS_OK) ||
call->destroy_called;
} }
static void unlock(grpc_call *call) { static void unlock(grpc_call *call) {
@ -774,6 +777,7 @@ static void call_on_done_recv(void *pc, int success) {
grpc_alarm_cancel(&call->alarm); grpc_alarm_cancel(&call->alarm);
call->have_alarm = 0; call->have_alarm = 0;
} }
GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
} }
finish_read_ops(call); finish_read_ops(call);
} else { } else {
@ -1036,6 +1040,8 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
void grpc_call_destroy(grpc_call *c) { void grpc_call_destroy(grpc_call *c) {
int cancel; int cancel;
lock(c); lock(c);
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
if (c->have_alarm) { if (c->have_alarm) {
grpc_alarm_cancel(&c->alarm); grpc_alarm_cancel(&c->alarm);
c->have_alarm = 0; c->have_alarm = 0;

Loading…
Cancel
Save