diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 3eb0fe528c0..701a519f26b 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -154,7 +154,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, } grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); - gpr_ref_init(&call->internal_refcount, 1); + /* one ref is dropped in response to destroy, the other in + stream_closed */ + gpr_ref_init(&call->internal_refcount, 2); grpc_call_stack_init(channel_stack, server_transport_data, CALL_STACK_FROM_CALL(call)); return call; @@ -172,19 +174,26 @@ void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } +static void destroy_call(grpc_call *c) { + grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); + grpc_channel_internal_unref(c->channel); + gpr_mu_destroy(&c->mu); + if (c->status_details) { + grpc_mdstr_unref(c->status_details); + } + if (c->legacy_state) { + gpr_free(c->legacy_state->md_out); + gpr_free(c->legacy_state->md_in.metadata); + gpr_free(c->legacy_state->trail_md_in.metadata); + /*gpr_free(c->legacy_state->status_in.details);*/ + gpr_free(c->legacy_state); + } + gpr_free(c); +} + void grpc_call_internal_unref(grpc_call *c) { if (gpr_unref(&c->internal_refcount)) { - grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); - grpc_channel_internal_unref(c->channel); - gpr_mu_destroy(&c->mu); - if (c->legacy_state) { - gpr_free(c->legacy_state->md_out); - gpr_free(c->legacy_state->md_in.metadata); - gpr_free(c->legacy_state->trail_md_in.metadata); - /*gpr_free(c->legacy_state->status_in.details);*/ - gpr_free(c->legacy_state); - } - gpr_free(c); + destroy_call(c); } } @@ -225,6 +234,7 @@ static void unlock(grpc_call *call) { sa = choose_send_action(call); if (sa != SEND_NOTHING) { call->sending = 1; + grpc_call_internal_ref(call); } } @@ -245,6 +255,7 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, reqinfo *master = call->requests[op].master; completed_request *cr; size_t i; + gpr_log(GPR_DEBUG, "finish op %d refs=%d", (int)op, (int)call->internal_refcount.count); switch (call->requests[op].state) { case REQ_INITIAL: /* not started yet */ return; @@ -292,6 +303,7 @@ static void finish_write_step(void *pc, grpc_op_error error) { } call->sending = 0; unlock(call); + grpc_call_internal_unref(call); } static void finish_finish_step(void *pc, grpc_op_error error) { @@ -305,6 +317,7 @@ static void finish_finish_step(void *pc, grpc_op_error error) { } call->sending = 0; unlock(call); + grpc_call_internal_unref(call); } static void finish_start_step(void *pc, grpc_op_error error) { @@ -318,6 +331,7 @@ static void finish_start_step(void *pc, grpc_op_error error) { } call->sending = 0; unlock(call); + grpc_call_internal_unref(call); } static send_action choose_send_action(grpc_call *call) { @@ -404,6 +418,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { lock(call); call->sending = 0; unlock(call); + grpc_call_internal_unref(call); break; case SEND_FINISH: if (!call->is_client) { @@ -819,6 +834,9 @@ grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) { } static void finish_write(grpc_call *call, grpc_op_error status, void *tag) { + lock(call); + grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out); + unlock(call); grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status); } @@ -833,7 +851,7 @@ grpc_call_error grpc_call_start_write(grpc_call *call, lock(call); ls = get_legacy_state(call); - ls->msg_out = byte_buffer; + ls->msg_out = grpc_byte_buffer_copy(byte_buffer); req.op = GRPC_IOREQ_SEND_MESSAGES; req.data.send_messages.count = 1; req.data.send_messages.messages = &ls->msg_out; @@ -922,6 +940,7 @@ void grpc_call_read_closed(grpc_call_element *elem) { void grpc_call_stream_closed(grpc_call_element *elem) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); lock(call); + GPR_ASSERT(!call->stream_closed); if (!call->read_closed) { call->read_closed = 1; finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); @@ -931,6 +950,7 @@ void grpc_call_stream_closed(grpc_call_element *elem) { call->stream_closed = 1; finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); unlock(call); + grpc_call_internal_unref(call); } /* we offset status by a small amount when storing it into transport metadata @@ -1002,7 +1022,7 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { : &call->buffered_trailing_metadata; } if (dest->count == dest->capacity) { - dest->capacity = GPR_MAX(dest->capacity + 1, dest->capacity * 3 / 2); + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); dest->metadata = gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); }