Fix edge cases resulting in close not being sent

pull/2765/head
Craig Tiller 9 years ago
parent fb21ae6040
commit 0c23f29a29
  1. 6
      src/core/transport/chttp2/internal.h
  2. 30
      src/core/transport/chttp2/writing.c
  3. 2
      src/core/transport/chttp2_transport.c

@ -119,6 +119,10 @@ typedef enum {
GRPC_WRITE_STATE_SENT_CLOSE GRPC_WRITE_STATE_SENT_CLOSE
} grpc_chttp2_write_state; } grpc_chttp2_write_state;
/* flags that can be or'd into stream_global::writing_now */
#define GRPC_CHTTP2_WRITING_DATA 1
#define GRPC_CHTTP2_WRITING_WINDOW 2
typedef enum { typedef enum {
GRPC_DONT_SEND_CLOSED = 0, GRPC_DONT_SEND_CLOSED = 0,
GRPC_SEND_CLOSED, GRPC_SEND_CLOSED,
@ -382,7 +386,7 @@ typedef struct {
gpr_uint8 published_cancelled; gpr_uint8 published_cancelled;
/** is this stream in the stream map? (boolean) */ /** is this stream in the stream map? (boolean) */
gpr_uint8 in_stream_map; gpr_uint8 in_stream_map;
/** is this stream actively being written? */ /** bitmask of GRPC_CHTTP2_WRITING_xxx above */
gpr_uint8 writing_now; gpr_uint8 writing_now;
/** stream state already published to the upper layer */ /** stream state already published to the upper layer */

@ -77,7 +77,6 @@ int grpc_chttp2_unlocking_check_writes(
stream_writing->id = stream_global->id; stream_writing->id = stream_global->id;
stream_writing->send_closed = GRPC_DONT_SEND_CLOSED; stream_writing->send_closed = GRPC_DONT_SEND_CLOSED;
GPR_ASSERT(!stream_global->writing_now);
if (stream_global->outgoing_sopb) { if (stream_global->outgoing_sopb) {
window_delta = window_delta =
@ -123,11 +122,13 @@ int grpc_chttp2_unlocking_check_writes(
stream_global->unannounced_incoming_window = 0; stream_global->unannounced_incoming_window = 0;
grpc_chttp2_list_add_incoming_window_updated(transport_global, grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global); stream_global);
stream_global->writing_now = 1; stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW;
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); }
} else if (stream_writing->sopb.nops > 0 || if (stream_writing->sopb.nops > 0 ||
stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
stream_global->writing_now = 1; stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA;
}
if (stream_global->writing_now != 0) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} }
} }
@ -183,6 +184,7 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
stream_writing->id, &transport_writing->hpack_compressor, stream_writing->id, &transport_writing->hpack_compressor,
&transport_writing->outbuf); &transport_writing->outbuf);
stream_writing->sopb.nops = 0;
} }
if (stream_writing->announce_window > 0) { if (stream_writing->announce_window > 0) {
gpr_slice_buffer_add( gpr_slice_buffer_add(
@ -191,7 +193,6 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
stream_writing->id, stream_writing->announce_window)); stream_writing->id, stream_writing->announce_window));
stream_writing->announce_window = 0; stream_writing->announce_window = 0;
} }
stream_writing->sopb.nops = 0;
if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
gpr_slice_buffer_add(&transport_writing->outbuf, gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_rst_stream_create(stream_writing->id, grpc_chttp2_rst_stream_create(stream_writing->id,
@ -215,20 +216,23 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream( while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) { transport_global, transport_writing, &stream_global, &stream_writing)) {
GPR_ASSERT(stream_global->writing_now); GPR_ASSERT(stream_global->writing_now != 0);
stream_global->writing_now = 0; if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
if (!transport_global->is_client) {
stream_global->read_closed = 1;
}
}
if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA) {
if (stream_global->outgoing_sopb != NULL && if (stream_global->outgoing_sopb != NULL &&
stream_global->outgoing_sopb->nops == 0) { stream_global->outgoing_sopb->nops == 0) {
GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
stream_global->outgoing_sopb = NULL; stream_global->outgoing_sopb = NULL;
grpc_chttp2_schedule_closure(transport_global, grpc_chttp2_schedule_closure(transport_global,
stream_global->send_done_closure, 1); stream_global->send_done_closure, 1);
} }
if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
if (!transport_global->is_client) {
stream_global->read_closed = 1;
}
} }
stream_global->writing_now = 0;
grpc_chttp2_list_add_read_write_state_changed(transport_global, grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global); stream_global);
} }

@ -855,7 +855,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
if (!stream_global->publish_sopb) { if (!stream_global->publish_sopb) {
continue; continue;
} }
if (stream_global->writing_now) { if (stream_global->writing_now != 0) {
continue; continue;
} }
/* FIXME(ctiller): we include in_stream_map in our computation of /* FIXME(ctiller): we include in_stream_map in our computation of

Loading…
Cancel
Save