From c1f7560ac27b6db4106115e5308f1a9124378a60 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 24 Apr 2015 11:44:53 -0700 Subject: [PATCH] Stream mapping fixes --- src/core/surface/call.c | 9 ++++++--- src/core/transport/chttp2_transport.c | 12 ++++++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/core/surface/call.c b/src/core/surface/call.c index dbf78f2cfe4..2f514465fc7 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -687,7 +687,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) { static void call_on_done_recv(void *pc, int success) { grpc_call *call = pc; size_t i; - int unref = 0; + int unref_due_to_connection_close = 0; lock(call); call->receiving = 0; if (success) { @@ -714,7 +714,7 @@ static void call_on_done_recv(void *pc, int success) { if (call->recv_state == GRPC_STREAM_CLOSED) { GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); call->read_state = READ_STATE_STREAM_CLOSED; - unref = 1; + unref_due_to_connection_close = 1; } finish_read_ops(call); } else { @@ -725,9 +725,11 @@ static void call_on_done_recv(void *pc, int success) { finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR); finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR); } + call->recv_ops.nops = 0; unlock(call); - if (unref) { + grpc_call_internal_unref(call, 0); + if (unref_due_to_connection_close) { grpc_call_internal_unref(call, 0); } } @@ -798,6 +800,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { op->bind_pollset = grpc_cq_pollset(call->cq); call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; call->write_state = WRITE_STATE_STARTED; + call->send_initial_metadata_count = 0; /* fall through intended */ case WRITE_STATE_STARTED: if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index a02fb936770..237def41aa6 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -306,7 +306,6 @@ struct stream { gpr_uint8 send_closed; gpr_uint8 read_closed; gpr_uint8 cancelled; - gpr_uint8 published_close; op_closure send_done_closure; op_closure recv_done_closure; @@ -731,6 +730,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) { static void remove_from_stream_map(transport *t, stream *s) { if (s->id == 0) return; + IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d", t->is_client? "CLI" : "SVR", + s->id)); if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) { maybe_start_some_streams(t); } @@ -999,6 +1000,8 @@ static void maybe_start_some_streams(transport *t) { stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); if (!s) break; + IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d", t->is_client? "CLI" : "SVR", s, t->next_stream_id)); + GPR_ASSERT(s->id == 0); s->id = t->next_stream_id; t->next_stream_id += 2; @@ -1018,6 +1021,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { s->write_state = WRITE_STATE_QUEUED_CLOSE; } if (s->id == 0) { + IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: New stream %p waiting for concurrency", t->is_client? "CLI" : "SVR", s)); stream_list_join(t, s, WAITING_FOR_CONCURRENCY); maybe_start_some_streams(t); } else if (s->outgoing_window > 0) { @@ -1300,7 +1304,8 @@ static void on_header(void *tp, grpc_mdelem *md) { GPR_ASSERT(s); - IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id, + IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, + t->is_client? "CLI" : "SVR", grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); @@ -1872,6 +1877,9 @@ static void finish_reads(transport *t) { if (*s->publish_state != s->published_state) { s->published_state = *s->publish_state; publish = 1; + if (s->published_state == GRPC_STREAM_CLOSED) { + remove_from_stream_map(t, s); + } } if (s->parser.incoming_sopb.nops > 0) { grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);