Stream mapping fixes

pull/1369/head
Craig Tiller 10 years ago
parent a940752dee
commit c1f7560ac2
  1. 9
      src/core/surface/call.c
  2. 12
      src/core/transport/chttp2_transport.c

@ -687,7 +687,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
static void call_on_done_recv(void *pc, int success) {
grpc_call *call = pc;
size_t i;
int unref = 0;
int unref_due_to_connection_close = 0;
lock(call);
call->receiving = 0;
if (success) {
@ -714,7 +714,7 @@ static void call_on_done_recv(void *pc, int success) {
if (call->recv_state == GRPC_STREAM_CLOSED) {
GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
call->read_state = READ_STATE_STREAM_CLOSED;
unref = 1;
unref_due_to_connection_close = 1;
}
finish_read_ops(call);
} else {
@ -725,9 +725,11 @@ static void call_on_done_recv(void *pc, int success) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR);
}
call->recv_ops.nops = 0;
unlock(call);
if (unref) {
grpc_call_internal_unref(call, 0);
if (unref_due_to_connection_close) {
grpc_call_internal_unref(call, 0);
}
}
@ -798,6 +800,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->bind_pollset = grpc_cq_pollset(call->cq);
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
call->write_state = WRITE_STATE_STARTED;
call->send_initial_metadata_count = 0;
/* fall through intended */
case WRITE_STATE_STARTED:
if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {

@ -306,7 +306,6 @@ struct stream {
gpr_uint8 send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
gpr_uint8 published_close;
op_closure send_done_closure;
op_closure recv_done_closure;
@ -731,6 +730,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) {
static void remove_from_stream_map(transport *t, stream *s) {
if (s->id == 0) return;
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d", t->is_client? "CLI" : "SVR",
s->id));
if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
maybe_start_some_streams(t);
}
@ -999,6 +1000,8 @@ static void maybe_start_some_streams(transport *t) {
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
if (!s) break;
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d", t->is_client? "CLI" : "SVR", s, t->next_stream_id));
GPR_ASSERT(s->id == 0);
s->id = t->next_stream_id;
t->next_stream_id += 2;
@ -1018,6 +1021,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
s->write_state = WRITE_STATE_QUEUED_CLOSE;
}
if (s->id == 0) {
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: New stream %p waiting for concurrency", t->is_client? "CLI" : "SVR", s));
stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
maybe_start_some_streams(t);
} else if (s->outgoing_window > 0) {
@ -1300,7 +1304,8 @@ static void on_header(void *tp, grpc_mdelem *md) {
GPR_ASSERT(s);
IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id,
IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id,
t->is_client? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->key),
grpc_mdstr_as_c_string(md->value)));
@ -1872,6 +1877,9 @@ static void finish_reads(transport *t) {
if (*s->publish_state != s->published_state) {
s->published_state = *s->publish_state;
publish = 1;
if (s->published_state == GRPC_STREAM_CLOSED) {
remove_from_stream_map(t, s);
}
}
if (s->parser.incoming_sopb.nops > 0) {
grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);

Loading…
Cancel
Save