diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index bcb024f2ac5..507b91b8a63 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -298,6 +298,7 @@ static void channel_op(grpc_channel_element *elem, grpc_channel_element *from_elem, grpc_channel_op *op) { channel_data *chand = elem->channel_data; grpc_child_channel *child_channel; + grpc_channel_op rop; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); switch (op->type) { @@ -323,6 +324,10 @@ static void channel_op(grpc_channel_element *elem, if (child_channel) { grpc_child_channel_destroy(child_channel, 1); } + /* fake a transport closed to satisfy the refcounting in client */ + rop.type = GRPC_TRANSPORT_CLOSED; + rop.dir = GRPC_CALL_UP; + grpc_channel_next_op(elem, &rop); break; case GRPC_TRANSPORT_GOAWAY: /* receiving goaway: if it's from our active child, drop the active child; diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c index dc280a60c50..9709a665bac 100644 --- a/src/core/surface/byte_buffer_queue.c +++ b/src/core/surface/byte_buffer_queue.c @@ -35,7 +35,13 @@ #include #include -static void bba_destroy(grpc_bbq_array *array) { gpr_free(array->data); } +static void bba_destroy(grpc_bbq_array *array, size_t start_pos) { + size_t i; + for (i = start_pos; i < array->count; i++) { + grpc_byte_buffer_destroy(array->data[i]); + } + gpr_free(array->data); +} /* Append an operation to an array, expanding as needed */ static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) { @@ -47,8 +53,8 @@ static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) { } void grpc_bbq_destroy(grpc_byte_buffer_queue *q) { - bba_destroy(&q->filling); - bba_destroy(&q->draining); + bba_destroy(&q->filling, 0); + bba_destroy(&q->draining, q->drain_pos); } int grpc_bbq_empty(grpc_byte_buffer_queue *q) { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index ebd6ace962a..c68ce5a6a81 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -146,10 +146,10 @@ struct grpc_call { /* Active ioreqs. request_set and request_data contain one element per active ioreq operation. - + request_set[op] is an integer specifying a set of operations to which the request belongs: - - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending + - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending completion, and the integer represents to which group of operations the ioreq belongs. Each group is represented by one master, and the integer in request_set is an index into masters to find the master @@ -158,7 +158,7 @@ struct grpc_call { started - finally, if request_set[op] is REQSET_DONE, then the operation is complete and unavailable to be started again - + request_data[op] is the request data as supplied by the initiator of a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT. The set fields are as per the request type specified by op. @@ -200,12 +200,12 @@ struct grpc_call { /* Call refcount - to keep the call alive during asynchronous operations */ gpr_refcount internal_refcount; - /* Data that the legacy api needs to track. To be deleted at some point + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; }; -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) @@ -273,6 +273,7 @@ static void destroy_call(void *call, int ignored_success) { if (c->legacy_state) { destroy_legacy_state(c->legacy_state); } + grpc_bbq_destroy(&c->incoming_queue); gpr_free(c); } @@ -334,7 +335,9 @@ static void unlock(grpc_call *call) { completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int num_completed_requests = call->num_completed_requests; int need_more_data = - call->need_more_data && !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA); + call->need_more_data && + !call->sending && + call->write_state >= WRITE_STATE_STARTED; int i; if (need_more_data) { @@ -853,7 +856,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; void *user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data) { - status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; + status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_SLICE_LENGTH(md->value->slice), @@ -941,6 +944,8 @@ struct legacy_state { char *details; grpc_status_code status; + char *send_details; + size_t msg_in_read_idx; grpc_byte_buffer *msg_in; @@ -966,6 +971,8 @@ static void destroy_legacy_state(legacy_state *ls) { } gpr_free(ls->initial_md_in.metadata); gpr_free(ls->trailing_md_in.metadata); + gpr_free(ls->details); + gpr_free(ls->send_details); gpr_free(ls); } @@ -1214,8 +1221,7 @@ grpc_call_error grpc_call_start_write_status_old(grpc_call *call, reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; reqs[1].op = GRPC_IOREQ_SEND_STATUS; reqs[1].data.send_status.code = status; - /* MEMLEAK */ - reqs[1].data.send_status.details = gpr_strdup(details); + reqs[1].data.send_status.details = ls->send_details = gpr_strdup(details); reqs[2].op = GRPC_IOREQ_SEND_CLOSE; err = start_ioreq(call, reqs, 3, finish_finish, tag); unlock(call); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index c33ea923e82..b33bd7b3575 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -52,6 +52,9 @@ struct grpc_channel { }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) +#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) (((grpc_channel *)(channel_stack)) - 1) +#define CHANNEL_FROM_TOP_ELEM(top_elem) \ + CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t num_filters, @@ -60,8 +63,8 @@ grpc_channel *grpc_channel_create_from_filters( sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); channel->is_client = is_client; - /* decremented by grpc_channel_destroy */ - gpr_ref_init(&channel->refs, 1); + /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if is_client */ + gpr_ref_init(&channel->refs, 1 + is_client); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); @@ -158,6 +161,10 @@ void grpc_channel_destroy(grpc_channel *channel) { grpc_channel_internal_unref(channel); } +void grpc_client_channel_closed(grpc_channel_element *elem) { + grpc_channel_internal_unref(CHANNEL_FROM_TOP_ELEM(elem)); +} + grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { return CHANNEL_STACK_FROM_CHANNEL(channel); } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index b3ea2ede403..ff9bbc237ef 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -45,6 +45,8 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); +void grpc_client_channel_closed(grpc_channel_element *elem); + void grpc_channel_internal_ref(grpc_channel *channel); void grpc_channel_internal_unref(grpc_channel *channel); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index fa63e855cc8..64ee9d51e80 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -34,6 +34,7 @@ #include "src/core/surface/client.h" #include "src/core/surface/call.h" +#include "src/core/surface/channel.h" #include "src/core/support/string.h" #include #include @@ -87,7 +88,7 @@ static void channel_op(grpc_channel_element *elem, gpr_log(GPR_ERROR, "Client cannot accept new calls"); break; case GRPC_TRANSPORT_CLOSED: - gpr_log(GPR_ERROR, "Transport closed"); + grpc_client_channel_closed(elem); break; case GRPC_TRANSPORT_GOAWAY: gpr_slice_unref(op->data.goaway.message); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 2f5eff55844..411dbabfd32 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -76,6 +76,9 @@ static void channel_op(grpc_channel_element *elem, case GRPC_CHANNEL_GOAWAY: gpr_slice_unref(op->data.goaway.message); break; + case GRPC_CHANNEL_DISCONNECT: + grpc_client_channel_closed(elem); + break; default: break; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index a057694f13a..455bd4337f1 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -258,7 +258,6 @@ static void stream_closed(grpc_call_element *elem) { gpr_mu_lock(&chand->server->mu); switch (calld->state) { case ACTIVATED: - grpc_call_stream_closed(elem); break; case PENDING: call_list_remove(chand->server, calld, PENDING_START); @@ -271,6 +270,7 @@ static void stream_closed(grpc_call_element *elem) { break; } gpr_mu_unlock(&chand->server->mu); + grpc_call_stream_closed(elem); } static void read_closed(grpc_call_element *elem) {