From 756beae6bf0092c839a87e42ca3830dbdb9a4ca2 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 11 Dec 2015 16:43:28 -0800 Subject: [PATCH 1/2] Add test to check that all settings frames are acked --- tools/http2_interop/s6.5.go | 58 ++++++++++++++++++++++++++++++++ tools/http2_interop/s6.5_test.go | 11 ++++++ tools/http2_interop/settings.go | 4 +++ 3 files changed, 73 insertions(+) diff --git a/tools/http2_interop/s6.5.go b/tools/http2_interop/s6.5.go index 32468abe831..4295c46f73a 100644 --- a/tools/http2_interop/s6.5.go +++ b/tools/http2_interop/s6.5.go @@ -1,6 +1,7 @@ package http2interop import ( + "fmt" "time" ) @@ -30,3 +31,60 @@ func testSmallMaxFrameSize(ctx *HTTP2InteropCtx) error { return nil } + +// Section 6.5.3 says all settings frames must be acked. +func testAllSettingsFramesAcked(ctx *HTTP2InteropCtx) error { + conn, err := connect(ctx) + if err != nil { + return err + } + defer conn.Close() + conn.SetDeadline(time.Now().Add(defaultTimeout)) + + sf := &SettingsFrame{} + if err := http2Connect(conn, sf); err != nil { + return err + } + + // The spec says "The values in the SETTINGS frame MUST be processed in the order they + // appear. [...] Once all values have been processed, the recipient MUST immediately + // emit a SETTINGS frame with the ACK flag set." From my understanding, processing all + // of no values warrants an ack per frame. + for i := 0; i < 10; i++ { + if err := streamFrame(conn, sf); err != nil { + return err + } + } + + var settingsFramesReceived = 0 + // The server by default sends a settings frame as part of the handshake, and another + // after the receipt of the initial settings frame as part of our conneection preface. + // This means we expected 1 + 1 + 10 = 12 settings frames in return, with all but the + // first having the ack bit. + for settingsFramesReceived < 12 { + f, err := parseFrame(conn) + if err != nil { + return err + } + + // Other frames come down the wire too, including window update. Just ignore those. + if f, ok := f.(*SettingsFrame); ok { + settingsFramesReceived += 1 + if settingsFramesReceived == 1 { + if f.Header.Flags&SETTINGS_FLAG_ACK > 0 { + return fmt.Errorf("settings frame should not have used ack: %v") + } + continue + } + + if f.Header.Flags&SETTINGS_FLAG_ACK == 0 { + return fmt.Errorf("settings frame should have used ack: %v", f) + } + if len(f.Params) != 0 { + return fmt.Errorf("settings ack cannot have params: %v", f) + } + } + } + + return nil +} diff --git a/tools/http2_interop/s6.5_test.go b/tools/http2_interop/s6.5_test.go index 9dadd4e699c..063fd5664c8 100644 --- a/tools/http2_interop/s6.5_test.go +++ b/tools/http2_interop/s6.5_test.go @@ -13,3 +13,14 @@ func TestSoonSmallMaxFrameSize(t *testing.T) { err := testSmallMaxFrameSize(ctx) matchError(t, err, "Got goaway frame") } + +func TestSoonAllSettingsFramesAcked(t *testing.T) { + defer Report(t) + if *testCase != "framing" { + t.SkipNow() + } + ctx := InteropCtx(t) + if err := testAllSettingsFramesAcked(ctx); err != nil { + t.Fatal(err) + } +} diff --git a/tools/http2_interop/settings.go b/tools/http2_interop/settings.go index 97914d960fa..544cec01ee7 100644 --- a/tools/http2_interop/settings.go +++ b/tools/http2_interop/settings.go @@ -26,6 +26,10 @@ const ( SettingsMaxHeaderListSize SettingsIdentifier = 6 ) +const ( + SETTINGS_FLAG_ACK byte = 0x01 +) + func (si SettingsIdentifier) String() string { switch si { case SettingsHeaderTableSize: From abb2e3dc00da63431b22696145bde3538132ebd8 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 15 Dec 2015 06:23:59 -0800 Subject: [PATCH 2/2] Handle cancelling writes whilst writing --- src/core/transport/chttp2/internal.h | 13 +++++++-- src/core/transport/chttp2/stream_lists.c | 20 ++++++++++++++ src/core/transport/chttp2_transport.c | 34 ++++++++++++++++++------ 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index fc35ea6f930..8bcb440c368 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -65,6 +65,7 @@ typedef enum { GRPC_CHTTP2_LIST_WRITTEN, GRPC_CHTTP2_LIST_PARSING_SEEN, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, + GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, /** streams that are waiting to start because there are too many concurrent streams on the connection */ @@ -283,6 +284,9 @@ struct grpc_chttp2_transport_parsing { gpr_slice goaway_text; gpr_int64 outgoing_window; + + /** pings awaiting responses */ + grpc_chttp2_outstanding_ping pings; }; struct grpc_chttp2_transport { @@ -391,8 +395,6 @@ typedef struct { gpr_uint8 write_closed; /** is this stream reading half-closed (boolean) */ gpr_uint8 read_closed; - /** is this stream finished closing (and reportably closed) */ - gpr_uint8 finished_close; /** is this stream in the stream map? (boolean) */ gpr_uint8 in_stream_map; /** has this stream seen an error? if 1, then pending incoming frames @@ -586,6 +588,13 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); +void grpc_chttp2_list_add_closed_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); +int grpc_chttp2_list_pop_closed_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global **stream_global); + grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index a4c85b4e574..49f951d08bf 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -353,6 +353,26 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing( return r; } +void grpc_chttp2_list_add_closed_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING); +} + +int grpc_chttp2_list_pop_closed_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global **stream_global) { + grpc_chttp2_stream *stream; + int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, + GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING); + if (r != 0) { + *stream_global = &stream->global; + } + return r; +} + void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 47268ab5fc3..7793f7c9e4e 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -134,6 +134,9 @@ static void connectivity_state_set( static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); +static void fail_pending_writes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global); + /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -625,6 +628,7 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, void *transport_writing_ptr, int success) { grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr; grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); + grpc_chttp2_stream_global *stream_global; GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0); @@ -638,6 +642,11 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); + while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) { + fail_pending_writes(exec_ctx, stream_global); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); + } + /* leave the writing flag up on shutdown to prevent further writes in unlock() from starting */ t->writing_active = 0; @@ -1107,6 +1116,16 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, } } +static void fail_pending_writes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global) { + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_initial_metadata_finished, 0); + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_trailing_metadata_finished, 0); + grpc_chttp2_complete_closure_step(exec_ctx, + &stream_global->send_message_finished, 0); +} + void grpc_chttp2_mark_stream_closed( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, int close_reads, @@ -1123,12 +1142,13 @@ void grpc_chttp2_mark_stream_closed( } if (close_writes && !stream_global->write_closed) { stream_global->write_closed = 1; - grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_initial_metadata_finished, 0); - grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_trailing_metadata_finished, 0); - grpc_chttp2_complete_closure_step(exec_ctx, - &stream_global->send_message_finished, 0); + if (TRANSPORT_FROM_GLOBAL(transport_global)->writing_active) { + GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes"); + grpc_chttp2_list_add_closed_waiting_for_writing(transport_global, + stream_global); + } else { + fail_pending_writes(exec_ctx, stream_global); + } } if (stream_global->read_closed && stream_global->write_closed) { if (stream_global->id != 0 && @@ -1140,7 +1160,6 @@ void grpc_chttp2_mark_stream_closed( remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), stream_global->id); } - stream_global->finished_close = 1; GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } } @@ -1354,7 +1373,6 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { GPR_ASSERT(stream_global->write_closed); GPR_ASSERT(stream_global->read_closed); remove_stream(exec_ctx, t, stream_global->id); - stream_global->finished_close = 1; GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } }