diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index ba9bdb57527..a21a7a4d759 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -35,6 +35,16 @@ #define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H #include "src/core/transport/transport_impl.h" +#include "src/core/iomgr/endpoint.h" +#include "src/core/transport/chttp2/frame_data.h" +#include "src/core/transport/chttp2/frame_goaway.h" +#include "src/core/transport/chttp2/frame_ping.h" +#include "src/core/transport/chttp2/frame_rst_stream.h" +#include "src/core/transport/chttp2/frame_settings.h" +#include "src/core/transport/chttp2/frame_window_update.h" +#include "src/core/transport/chttp2/stream_map.h" +#include "src/core/transport/chttp2/hpack_parser.h" +#include "src/core/transport/chttp2/stream_encoder.h" typedef struct grpc_chttp2_transport grpc_chttp2_transport; typedef struct grpc_chttp2_stream grpc_chttp2_stream; @@ -336,4 +346,8 @@ struct grpc_chttp2_stream { grpc_stream_op_buffer callback_sopb; }; +/** Someone is unlocking the transport mutex: check to see if writes + are required, and schedule them if so */ +void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); + #endif diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 2a0cf2562fe..a1830a8c253 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -32,3 +32,93 @@ */ #include "src/core/transport/chttp2/internal.h" + +#include + +static void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport *t) { + grpc_chttp2_stream *s; + gpr_uint32 window_delta; + + /* don't do anything if we are already writing */ + if (t->writing.executing) { + return; + } + + /* simple writes are queued to qbuf, and flushed here */ + gpr_slice_buffer_swap(&t->global.qbuf, &t->writing.outbuf); + GPR_ASSERT(t->global.qbuf.count == 0); + + if (t->dirtied_local_settings && !t->sent_local_settings) { + gpr_slice_buffer_add( + &t->writing.outbuf, grpc_chttp2_settings_create( + t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS], + t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); + t->force_send_settings = 0; + t->dirtied_local_settings = 0; + t->sent_local_settings = 1; + } + + /* for each grpc_chttp2_stream that's become writable, frame it's data (according to + available window sizes) and add to the output buffer */ + while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) && + s->outgoing_window > 0) { + window_delta = grpc_chttp2_preencode( + s->outgoing_sopb->ops, &s->outgoing_sopb->nops, + GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing.sopb); + FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta); + FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta); + t->outgoing_window -= window_delta; + s->outgoing_window -= window_delta; + + if (s->write_state == WRITE_STATE_QUEUED_CLOSE && + s->outgoing_sopb->nops == 0) { + if (!t->is_client && !s->read_closed) { + s->writing.send_closed = SEND_CLOSED_WITH_RST_STREAM; + } else { + s->writing.send_closed = SEND_CLOSED; + } + } + if (s->writing.sopb.nops > 0 || s->writing.send_closed) { + stream_list_join(t, s, WRITING); + } + + /* we should either exhaust window or have no ops left, but not both */ + if (s->outgoing_sopb->nops == 0) { + s->outgoing_sopb = NULL; + schedule_cb(t, s->global.send_done_closure, 1); + } else if (s->outgoing_window) { + stream_list_add_tail(t, s, WRITABLE); + } + } + + if (!t->parsing.executing) { + /* for each grpc_chttp2_stream that wants to update its window, add that window here */ + while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) { + window_delta = + t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - + s->incoming_window; + if (!s->read_closed && window_delta) { + gpr_slice_buffer_add( + &t->writing.outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); + FLOWCTL_TRACE(t, s, incoming, s->id, window_delta); + s->incoming_window += window_delta; + } + } + + /* if the grpc_chttp2_transport is ready to send a window update, do so here also */ + if (t->incoming_window < t->connection_window_target * 3 / 4) { + window_delta = t->connection_window_target - t->incoming_window; + gpr_slice_buffer_add(&t->writing.outbuf, + grpc_chttp2_window_update_create(0, window_delta)); + FLOWCTL_TRACE(t, t, incoming, 0, window_delta); + t->incoming_window += window_delta; + } + } + + if (t->writing.outbuf.length > 0 || !stream_list_empty(t, WRITING)) { + t->writing.executing = 1; + ref_transport(t); + gpr_log(GPR_DEBUG, "schedule write"); + schedule_cb(t, &t->writing.action, 1); + } +} diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 77537875789..1cfbc07d971 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -39,17 +39,8 @@ #include "src/core/profiling/timers.h" #include "src/core/support/string.h" -#include "src/core/transport/chttp2/frame_data.h" -#include "src/core/transport/chttp2/frame_goaway.h" -#include "src/core/transport/chttp2/frame_ping.h" -#include "src/core/transport/chttp2/frame_rst_stream.h" -#include "src/core/transport/chttp2/frame_settings.h" -#include "src/core/transport/chttp2/frame_window_update.h" -#include "src/core/transport/chttp2/hpack_parser.h" #include "src/core/transport/chttp2/http2_errors.h" #include "src/core/transport/chttp2/status_conversion.h" -#include "src/core/transport/chttp2/stream_encoder.h" -#include "src/core/transport/chttp2/stream_map.h" #include "src/core/transport/chttp2/timeout_encoding.h" #include "src/core/transport/chttp2/internal.h" #include "src/core/transport/transport_impl.h" @@ -90,7 +81,6 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, static void lock(grpc_chttp2_transport *t); static void unlock(grpc_chttp2_transport *t); -static void unlock_check_writes(grpc_chttp2_transport* t); static void unlock_check_cancellations(grpc_chttp2_transport* t); static void unlock_check_parser(grpc_chttp2_transport* t); static void unlock_check_channel_callbacks(grpc_chttp2_transport* t); @@ -535,7 +525,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_chttp2_transport *t) { grpc_iomgr_closure *run_closures; - unlock_check_writes(t); + grpc_chttp2_unlocking_check_writes(t); unlock_check_cancellations(t); unlock_check_parser(t); unlock_check_channel_callbacks(t); @@ -571,94 +561,6 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } } -static void unlock_check_writes(grpc_chttp2_transport *t) { - grpc_chttp2_stream *s; - gpr_uint32 window_delta; - - /* don't do anything if we are already writing */ - if (t->writing.executing) { - return; - } - - /* simple writes are queued to qbuf, and flushed here */ - gpr_slice_buffer_swap(&t->global.qbuf, &t->writing.outbuf); - GPR_ASSERT(t->global.qbuf.count == 0); - - if (t->dirtied_local_settings && !t->sent_local_settings) { - gpr_slice_buffer_add( - &t->writing.outbuf, grpc_chttp2_settings_create( - t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS], - t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); - t->force_send_settings = 0; - t->dirtied_local_settings = 0; - t->sent_local_settings = 1; - } - - /* for each grpc_chttp2_stream that's become writable, frame it's data (according to - available window sizes) and add to the output buffer */ - while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) && - s->outgoing_window > 0) { - window_delta = grpc_chttp2_preencode( - s->outgoing_sopb->ops, &s->outgoing_sopb->nops, - GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing.sopb); - FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta); - FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta); - t->outgoing_window -= window_delta; - s->outgoing_window -= window_delta; - - if (s->write_state == WRITE_STATE_QUEUED_CLOSE && - s->outgoing_sopb->nops == 0) { - if (!t->is_client && !s->read_closed) { - s->writing.send_closed = SEND_CLOSED_WITH_RST_STREAM; - } else { - s->writing.send_closed = SEND_CLOSED; - } - } - if (s->writing.sopb.nops > 0 || s->writing.send_closed) { - stream_list_join(t, s, WRITING); - } - - /* we should either exhaust window or have no ops left, but not both */ - if (s->outgoing_sopb->nops == 0) { - s->outgoing_sopb = NULL; - schedule_cb(t, s->global.send_done_closure, 1); - } else if (s->outgoing_window) { - stream_list_add_tail(t, s, WRITABLE); - } - } - - if (!t->parsing.executing) { - /* for each grpc_chttp2_stream that wants to update its window, add that window here */ - while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) { - window_delta = - t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - - s->incoming_window; - if (!s->read_closed && window_delta) { - gpr_slice_buffer_add( - &t->writing.outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); - FLOWCTL_TRACE(t, s, incoming, s->id, window_delta); - s->incoming_window += window_delta; - } - } - - /* if the grpc_chttp2_transport is ready to send a window update, do so here also */ - if (t->incoming_window < t->connection_window_target * 3 / 4) { - window_delta = t->connection_window_target - t->incoming_window; - gpr_slice_buffer_add(&t->writing.outbuf, - grpc_chttp2_window_update_create(0, window_delta)); - FLOWCTL_TRACE(t, t, incoming, 0, window_delta); - t->incoming_window += window_delta; - } - } - - if (t->writing.outbuf.length > 0 || !stream_list_empty(t, WRITING)) { - t->writing.executing = 1; - ref_transport(t); - gpr_log(GPR_DEBUG, "schedule write"); - schedule_cb(t, &t->writing.action, 1); - } -} - static void writing_finalize_outbuf(grpc_chttp2_transport *t) { grpc_chttp2_stream *s;