From 04d8bc2c9b043efa1529d06e2f4c3052b8953357 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Mon, 6 Nov 2017 19:49:46 -0800 Subject: [PATCH 01/16] First pass at abstract base flow control class --- .../chttp2/transport/chttp2_transport.cc | 8 +- .../transport/chttp2/transport/flow_control.h | 95 +++++++++++-------- .../ext/transport/chttp2/transport/internal.h | 8 +- 3 files changed, 70 insertions(+), 41 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 63ac65ac788..a239c6ee4be 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -544,7 +544,8 @@ static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, } } - t->flow_control.Init(exec_ctx, t, enable_bdp); + t->flow_control.Init(exec_ctx, t, + enable_bdp); /* No pings allowed before receiving a header or data frame. */ t->ping_state.pings_before_data_required = 0; @@ -716,7 +717,10 @@ static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, post_destructive_reclaimer(exec_ctx, t); } - s->flow_control.Init(t->flow_control.get(), s); + s->flow_control.Init( + static_cast( + t->flow_control.get()), + s); GPR_TIMER_END("init_stream", 0); return 0; diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h index 2515c94309a..3d9f3a181f8 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.h +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -24,6 +24,7 @@ #include #include "src/core/ext/transport/chttp2/transport/http2_settings.h" +#include "src/core/lib/support/abstract.h" #include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/pid_controller.h" @@ -132,7 +133,30 @@ class FlowControlTrace { int64_t announced_window_delta_; }; -class TransportFlowControl { +class TransportFlowControlBase { + public: + TransportFlowControlBase() {} + virtual ~TransportFlowControlBase() {} + virtual uint32_t MaybeSendUpdate(bool writing_anyway) {abort();} + virtual FlowControlAction MakeAction() {abort();} + virtual FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) {abort();} + virtual void StreamSentData(int64_t size) {abort();} + virtual grpc_error* RecvData(int64_t incoming_frame_size) {abort();} + virtual void RecvUpdate(uint32_t size) {abort();} + virtual BdpEstimator* bdp_estimator() { return nullptr; } + int64_t remote_window() const { return remote_window_; } + virtual int64_t target_window() const { return target_initial_window_size_; } + int64_t announced_window() const{ return announced_window_; } + + GRPC_ABSTRACT_BASE_CLASS + + protected: + int64_t remote_window_ = kDefaultWindow; + int64_t target_initial_window_size_ = kDefaultWindow; + int64_t announced_window_ = kDefaultWindow; +}; + +class TransportFlowControl final : public TransportFlowControlBase { public: TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t, bool enable_bdp_probe); @@ -144,25 +168,25 @@ class TransportFlowControl { // else returns zero; writing_anyway indicates if a write would happen // regardless of the send - if it is false and this function returns non-zero, // this announce will cause a write to occur - uint32_t MaybeSendUpdate(bool writing_anyway); + uint32_t MaybeSendUpdate(bool writing_anyway) override; // Reads the flow control data and returns and actionable struct that will // tell chttp2 exactly what it needs to do - FlowControlAction MakeAction() { return UpdateAction(FlowControlAction()); } + FlowControlAction MakeAction() override { return UpdateAction(FlowControlAction()); } // Call periodically (at a low-ish rate, 100ms - 10s makes sense) // to perform more complex flow control calculations and return an action // to let chttp2 change its parameters - FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx); + FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) override; - void StreamSentData(int64_t size) { remote_window_ -= size; } + void StreamSentData(int64_t size) override { remote_window_ -= size; } grpc_error* ValidateRecvData(int64_t incoming_frame_size); void CommitRecvData(int64_t incoming_frame_size) { announced_window_ -= incoming_frame_size; } - grpc_error* RecvData(int64_t incoming_frame_size) { + grpc_error* RecvData(int64_t incoming_frame_size) override { FlowControlTrace trace(" data recv", this, nullptr); grpc_error* error = ValidateRecvData(incoming_frame_size); if (error != GRPC_ERROR_NONE) return error; @@ -171,18 +195,16 @@ class TransportFlowControl { } // we have received a WINDOW_UPDATE frame for a transport - void RecvUpdate(uint32_t size) { + void RecvUpdate(uint32_t size) override { FlowControlTrace trace("t updt recv", this, nullptr); remote_window_ += size; } - int64_t remote_window() const { return remote_window_; } - int64_t target_window() const { + int64_t target_window() const override { return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), announced_stream_total_over_incoming_window_ + target_initial_window_size_); } - int64_t announced_window() const { return announced_window_; } const grpc_chttp2_transport* transport() const { return t_; } @@ -202,7 +224,7 @@ class TransportFlowControl { } } - BdpEstimator* bdp_estimator() { return &bdp_estimator_; } + BdpEstimator* bdp_estimator() override { return &bdp_estimator_; } void TestOnlyForceHugeWindow() { announced_window_ = 1024 * 1024 * 1024; @@ -226,9 +248,6 @@ class TransportFlowControl { const grpc_chttp2_transport* const t_; - /** Our bookkeeping for the remote peer's available window */ - int64_t remote_window_ = kDefaultWindow; - /** calculating what we should give for local window: we track the total amount of flow control over initial window size across all streams: this is data that we want to receive right now (it @@ -240,13 +259,6 @@ class TransportFlowControl { int64_t announced_stream_total_over_incoming_window_ = 0; int64_t announced_stream_total_under_incoming_window_ = 0; - /** This is out window according to what we have sent to our remote peer. The - * difference between this and target window is what we use to decide when - * to send WINDOW_UPDATE frames. */ - int64_t announced_window_ = kDefaultWindow; - - int32_t target_initial_window_size_ = kDefaultWindow; - /** should we probe bdp? */ const bool enable_bdp_probe_; @@ -258,7 +270,31 @@ class TransportFlowControl { grpc_millis last_pid_update_ = 0; }; -class StreamFlowControl { +class StreamFlowControlBase { + public: + StreamFlowControlBase() {} + virtual ~StreamFlowControlBase() {} + virtual FlowControlAction UpdateAction(FlowControlAction action) {abort();} + virtual FlowControlAction MakeAction() {abort();} + virtual void SentData(int64_t outgoing_frame_size) {abort();} + virtual grpc_error* RecvData(int64_t incoming_frame_size) {abort();} + virtual uint32_t MaybeSendUpdate() {abort();} + virtual void RecvUpdate(uint32_t size) {abort();} + virtual void IncomingByteStreamUpdate(size_t max_size_hint, + size_t have_already) {abort();} + int64_t remote_window_delta() { return remote_window_delta_; } + int64_t local_window_delta() { return local_window_delta_; } + int64_t announced_window_delta() { return announced_window_delta_; } + + GRPC_ABSTRACT_BASE_CLASS + + protected: + int64_t remote_window_delta_ = 0; + int64_t local_window_delta_ = 0; + int64_t announced_window_delta_ = 0; +}; + +class StreamFlowControl final : public StreamFlowControlBase { public: StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s); ~StreamFlowControl() { @@ -314,21 +350,6 @@ class StreamFlowControl { announced_window_delta_ += change; tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); } - - /** window available for us to send to peer, over or under the initial - * window - * size of the transport... ie: - * remote_window = remote_window_delta + transport.initial_window_size */ - int64_t remote_window_delta_ = 0; - - /** window available for peer to send to us (as a delta on - * transport.initial_window_size) - * local_window = local_window_delta + transport.initial_window_size */ - int64_t local_window_delta_ = 0; - - /** window available for peer to send to us over this stream that we have - * announced to the peer */ - int64_t announced_window_delta_ = 0; }; } // namespace chttp2 diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index f6fd6795d0a..0441f1c5413 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -351,7 +351,9 @@ struct grpc_chttp2_transport { /** parser for goaway frames */ grpc_chttp2_goaway_parser goaway_parser; - grpc_core::ManualConstructor + grpc_core::PolymorphicManualConstructor< + grpc_core::chttp2::TransportFlowControlBase, + grpc_core::chttp2::TransportFlowControl> flow_control; /** initial window change. This is tracked as we parse settings frames from * the remote peer. If there is a positive delta, then we will make all @@ -526,7 +528,9 @@ struct grpc_chttp2_stream { bool sent_initial_metadata; bool sent_trailing_metadata; - grpc_core::ManualConstructor + grpc_core::PolymorphicManualConstructor< + grpc_core::chttp2::StreamFlowControlBase, + grpc_core::chttp2::StreamFlowControl> flow_control; grpc_slice_buffer flow_controlled_buffer; From 7ccb79bfbd5afb9ba100643581157486b8510483 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Mon, 27 Nov 2017 10:22:44 -0500 Subject: [PATCH 02/16] First pass at mocked out Flow Control classes --- .../chttp2/transport/chttp2_transport.cc | 37 ++++++++-- .../transport/chttp2/transport/flow_control.h | 73 +++++++++++++++---- .../ext/transport/chttp2/transport/internal.h | 6 +- 3 files changed, 91 insertions(+), 25 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index a239c6ee4be..70b30b7db20 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -544,8 +544,22 @@ static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, } } - t->flow_control.Init(exec_ctx, t, - enable_bdp); + // Tune the heck out of this + const uint32_t kFrameSize = 1024 * 1024; + + if (true /* disable flow control*/) { + t->flow_control.Init(); + t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = + kFrameSize; + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = + kFrameSize; + t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = + kFrameSize; + enable_bdp = false; + } else { + t->flow_control.Init(exec_ctx, t, + enable_bdp); + } /* No pings allowed before receiving a header or data frame. */ t->ping_state.pings_before_data_required = 0; @@ -717,10 +731,14 @@ static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, post_destructive_reclaimer(exec_ctx, t); } - s->flow_control.Init( - static_cast( - t->flow_control.get()), - s); + if (true /* disable flow control */) { + s->flow_control.Init(); + } else { + s->flow_control.Init( + static_cast( + t->flow_control.get()), + s); + } GPR_TIMER_END("init_stream", 0); return 0; @@ -2518,8 +2536,11 @@ static void read_action_locked(grpc_exec_ctx* exec_ctx, void* tp, grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, GRPC_ERROR_NONE}; for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { - t->flow_control->bdp_estimator()->AddIncomingBytes( - (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); + grpc_core::BdpEstimator* bdp_est = t->flow_control->bdp_estimator(); + if (bdp_est) { + bdp_est->AddIncomingBytes( + (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); + } errors[1] = grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); } diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h index 3d9f3a181f8..d28e31d4030 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.h +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -137,16 +137,17 @@ class TransportFlowControlBase { public: TransportFlowControlBase() {} virtual ~TransportFlowControlBase() {} - virtual uint32_t MaybeSendUpdate(bool writing_anyway) {abort();} - virtual FlowControlAction MakeAction() {abort();} - virtual FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) {abort();} - virtual void StreamSentData(int64_t size) {abort();} - virtual grpc_error* RecvData(int64_t incoming_frame_size) {abort();} - virtual void RecvUpdate(uint32_t size) {abort();} + virtual uint32_t MaybeSendUpdate(bool writing_anyway) { abort(); } + virtual FlowControlAction MakeAction() { abort(); } + virtual FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) { abort(); } + virtual void StreamSentData(int64_t size) { abort(); } + virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } + virtual void RecvUpdate(uint32_t size) { abort(); } + // TODO(ncteisen): maybe completely encapsulate this inside FlowControl virtual BdpEstimator* bdp_estimator() { return nullptr; } int64_t remote_window() const { return remote_window_; } virtual int64_t target_window() const { return target_initial_window_size_; } - int64_t announced_window() const{ return announced_window_; } + int64_t announced_window() const { return announced_window_; } GRPC_ABSTRACT_BASE_CLASS @@ -156,6 +157,28 @@ class TransportFlowControlBase { int64_t announced_window_ = kDefaultWindow; }; +const int64_t kMaxWindow = (int64_t)((1u << 31) - 1); + +class TransportFlowControlDisabled final : public TransportFlowControlBase { + public: + TransportFlowControlDisabled() { + remote_window_ = kMaxWindow; + target_initial_window_size_ = kMaxWindow; + announced_window_ = kMaxWindow; + } + virtual uint32_t MaybeSendUpdate(bool writing_anyway) { return 0; } + virtual FlowControlAction MakeAction() { return FlowControlAction(); } + virtual FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) { + return FlowControlAction(); + } + virtual void StreamSentData(int64_t size) {} + virtual grpc_error* RecvData(int64_t incoming_frame_size) { + return GRPC_ERROR_NONE; + } + virtual void RecvUpdate(uint32_t size) {} + virtual int64_t target_window() const { return kMaxWindow; } +}; + class TransportFlowControl final : public TransportFlowControlBase { public: TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t, @@ -172,7 +195,9 @@ class TransportFlowControl final : public TransportFlowControlBase { // Reads the flow control data and returns and actionable struct that will // tell chttp2 exactly what it needs to do - FlowControlAction MakeAction() override { return UpdateAction(FlowControlAction()); } + FlowControlAction MakeAction() override { + return UpdateAction(FlowControlAction()); + } // Call periodically (at a low-ish rate, 100ms - 10s makes sense) // to perform more complex flow control calculations and return an action @@ -274,14 +299,16 @@ class StreamFlowControlBase { public: StreamFlowControlBase() {} virtual ~StreamFlowControlBase() {} - virtual FlowControlAction UpdateAction(FlowControlAction action) {abort();} - virtual FlowControlAction MakeAction() {abort();} - virtual void SentData(int64_t outgoing_frame_size) {abort();} - virtual grpc_error* RecvData(int64_t incoming_frame_size) {abort();} - virtual uint32_t MaybeSendUpdate() {abort();} - virtual void RecvUpdate(uint32_t size) {abort();} + virtual FlowControlAction UpdateAction(FlowControlAction action) { abort(); } + virtual FlowControlAction MakeAction() { abort(); } + virtual void SentData(int64_t outgoing_frame_size) { abort(); } + virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } + virtual uint32_t MaybeSendUpdate() { abort(); } + virtual void RecvUpdate(uint32_t size) { abort(); } virtual void IncomingByteStreamUpdate(size_t max_size_hint, - size_t have_already) {abort();} + size_t have_already) { + abort(); + } int64_t remote_window_delta() { return remote_window_delta_; } int64_t local_window_delta() { return local_window_delta_; } int64_t announced_window_delta() { return announced_window_delta_; } @@ -294,6 +321,22 @@ class StreamFlowControlBase { int64_t announced_window_delta_ = 0; }; +class StreamFlowControlDisabled : public StreamFlowControlBase { + public: + virtual FlowControlAction UpdateAction(FlowControlAction action) { + return action; + } + virtual FlowControlAction MakeAction() { return FlowControlAction(); } + virtual void SentData(int64_t outgoing_frame_size) {} + virtual grpc_error* RecvData(int64_t incoming_frame_size) { + return GRPC_ERROR_NONE; + } + virtual uint32_t MaybeSendUpdate() { return 0; } + virtual void RecvUpdate(uint32_t size) {} + virtual void IncomingByteStreamUpdate(size_t max_size_hint, + size_t have_already) {} +}; + class StreamFlowControl final : public StreamFlowControlBase { public: StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 0441f1c5413..89214a601c8 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -353,7 +353,8 @@ struct grpc_chttp2_transport { grpc_core::PolymorphicManualConstructor< grpc_core::chttp2::TransportFlowControlBase, - grpc_core::chttp2::TransportFlowControl> + grpc_core::chttp2::TransportFlowControl, + grpc_core::chttp2::TransportFlowControlDisabled> flow_control; /** initial window change. This is tracked as we parse settings frames from * the remote peer. If there is a positive delta, then we will make all @@ -530,7 +531,8 @@ struct grpc_chttp2_stream { grpc_core::PolymorphicManualConstructor< grpc_core::chttp2::StreamFlowControlBase, - grpc_core::chttp2::StreamFlowControl> + grpc_core::chttp2::StreamFlowControl, + grpc_core::chttp2::StreamFlowControlDisabled> flow_control; grpc_slice_buffer flow_controlled_buffer; From 99849886eb260af69eb948014c3a49bb2351112b Mon Sep 17 00:00:00 2001 From: ncteisen Date: Mon, 27 Nov 2017 15:37:13 -0800 Subject: [PATCH 03/16] Disable flow control settings frames --- src/core/ext/transport/chttp2/transport/frame_settings.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc index de4340fea50..1fd3a66463a 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.cc +++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc @@ -187,6 +187,11 @@ grpc_error* grpc_chttp2_settings_parser_parse(grpc_exec_ctx* exec_ctx, void* p, if (grpc_wire_id_to_setting_id(parser->id, &id)) { const grpc_chttp2_setting_parameters* sp = &grpc_chttp2_settings_parameters[id]; + if ((id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE || + id == GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE) && + true /* disable flow conrtol */) { + continue; + } if (parser->value < sp->min_value || parser->value > sp->max_value) { switch (sp->invalid_value_behavior) { case GRPC_CHTTP2_CLAMP_INVALID_VALUE: From 15c32cd4813e20cac65e9c2a8f9cea4e1d483a05 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 5 Dec 2017 12:57:49 -0800 Subject: [PATCH 04/16] Fix parsing check --- .../chttp2/transport/chttp2_transport.cc | 18 +++++++++++++----- .../ext/transport/chttp2/transport/parsing.cc | 7 ++++--- .../transport/chttp2/transport/stream_lists.cc | 2 ++ test/core/end2end/tests/payload.cc | 3 +++ 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 70b30b7db20..233ea7f6a6f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -338,11 +338,13 @@ static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, } } t->dirtied_local_settings = 1; - /* Hack: it's common for implementations to assume 65536 bytes initial send - window -- this should by rights be 0 */ - t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; - t->sent_local_settings = 0; - t->write_buffer_size = grpc_core::chttp2::kDefaultWindow; + if (!true /*diable flow control */) { + /* Hack: it's common for implementations to assume 65536 bytes initial send + window -- this should by rights be 0 */ + t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; + t->sent_local_settings = 0; + t->write_buffer_size = grpc_core::chttp2::kDefaultWindow; + } if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( @@ -555,6 +557,12 @@ static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, kFrameSize; t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = kFrameSize; + t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = + grpc_core::chttp2::kMaxWindow; + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = + grpc_core::chttp2::kMaxWindow; + t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = + grpc_core::chttp2::kMaxWindow; enable_bdp = false; } else { t->flow_control.Init(exec_ctx, t, diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 46ec3fbaa61..dce6aa42276 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -197,9 +197,10 @@ grpc_error* grpc_chttp2_perform_read(grpc_exec_ctx* exec_ctx, return GRPC_ERROR_NONE; } goto dts_fh_0; /* loop */ - } else if (t->incoming_frame_size > - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) { + } else if (!true /*disable flow control*/ && + t->incoming_frame_size > + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) { char* msg; gpr_asprintf(&msg, "Frame size %d is larger than max frame size %d", t->incoming_frame_size, diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.cc b/src/core/ext/transport/chttp2/transport/stream_lists.cc index c95d02541a7..c6e2bada337 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.cc +++ b/src/core/ext/transport/chttp2/transport/stream_lists.cc @@ -183,6 +183,7 @@ void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t, void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t, grpc_chttp2_stream* s) { + GPR_ASSERT(!true /*flow control disabled */); stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); } @@ -198,6 +199,7 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t, void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s) { + GPR_ASSERT(!true /*flow control disabled */); stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); } diff --git a/test/core/end2end/tests/payload.cc b/test/core/end2end/tests/payload.cc index 2e9513b9cbe..8c3fd1b668a 100644 --- a/test/core/end2end/tests/payload.cc +++ b/test/core/end2end/tests/payload.cc @@ -125,6 +125,7 @@ static void request_response_with_payload(grpc_end2end_test_config config, grpc_byte_buffer* response_payload_recv = nullptr; grpc_call_details call_details; grpc_status_code status; + const char* error_string = nullptr; grpc_call_error error; grpc_slice details; int was_cancelled = 2; @@ -172,6 +173,7 @@ static void request_response_with_payload(grpc_end2end_test_config config, op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.error_string = &error_string; op->flags = 0; op->reserved = nullptr; op++; @@ -230,6 +232,7 @@ static void request_response_with_payload(grpc_end2end_test_config config, CQ_EXPECT_COMPLETION(cqv, tag(1), 1); cq_verify(cqv); + gpr_log(GPR_ERROR, "%s", error_string); GPR_ASSERT(status == GRPC_STATUS_OK); GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); From 3ca7a087a9a9c77f9e95e132835bc422a01da5ef Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 5 Dec 2017 13:08:34 -0800 Subject: [PATCH 05/16] Fix buildtests compile --- .../transport/chttp2/transport/flow_control.h | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h index d28e31d4030..79d2dec9400 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.h +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -148,6 +148,7 @@ class TransportFlowControlBase { int64_t remote_window() const { return remote_window_; } virtual int64_t target_window() const { return target_initial_window_size_; } int64_t announced_window() const { return announced_window_; } + virtual void TestOnlyForceHugeWindow() {} GRPC_ABSTRACT_BASE_CLASS @@ -166,17 +167,17 @@ class TransportFlowControlDisabled final : public TransportFlowControlBase { target_initial_window_size_ = kMaxWindow; announced_window_ = kMaxWindow; } - virtual uint32_t MaybeSendUpdate(bool writing_anyway) { return 0; } - virtual FlowControlAction MakeAction() { return FlowControlAction(); } - virtual FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) { + uint32_t MaybeSendUpdate(bool writing_anyway) override { return 0; } + FlowControlAction MakeAction() override { return FlowControlAction(); } + FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) override { return FlowControlAction(); } - virtual void StreamSentData(int64_t size) {} - virtual grpc_error* RecvData(int64_t incoming_frame_size) { + void StreamSentData(int64_t size) override {} + grpc_error* RecvData(int64_t incoming_frame_size) override { return GRPC_ERROR_NONE; } - virtual void RecvUpdate(uint32_t size) {} - virtual int64_t target_window() const { return kMaxWindow; } + void RecvUpdate(uint32_t size) override {} + int64_t target_window() const override { return kMaxWindow; } }; class TransportFlowControl final : public TransportFlowControlBase { @@ -251,7 +252,7 @@ class TransportFlowControl final : public TransportFlowControlBase { BdpEstimator* bdp_estimator() override { return &bdp_estimator_; } - void TestOnlyForceHugeWindow() { + void TestOnlyForceHugeWindow() override { announced_window_ = 1024 * 1024 * 1024; remote_window_ = 1024 * 1024 * 1024; } @@ -309,6 +310,7 @@ class StreamFlowControlBase { size_t have_already) { abort(); } + virtual void TestOnlyForceHugeWindow() {} int64_t remote_window_delta() { return remote_window_delta_; } int64_t local_window_delta() { return local_window_delta_; } int64_t announced_window_delta() { return announced_window_delta_; } @@ -323,18 +325,18 @@ class StreamFlowControlBase { class StreamFlowControlDisabled : public StreamFlowControlBase { public: - virtual FlowControlAction UpdateAction(FlowControlAction action) { + FlowControlAction UpdateAction(FlowControlAction action) override { return action; } - virtual FlowControlAction MakeAction() { return FlowControlAction(); } - virtual void SentData(int64_t outgoing_frame_size) {} - virtual grpc_error* RecvData(int64_t incoming_frame_size) { + FlowControlAction MakeAction() override { return FlowControlAction(); } + void SentData(int64_t outgoing_frame_size) override {} + grpc_error* RecvData(int64_t incoming_frame_size) override { return GRPC_ERROR_NONE; } - virtual uint32_t MaybeSendUpdate() { return 0; } - virtual void RecvUpdate(uint32_t size) {} - virtual void IncomingByteStreamUpdate(size_t max_size_hint, - size_t have_already) {} + uint32_t MaybeSendUpdate() override { return 0; } + void RecvUpdate(uint32_t size) override {} + void IncomingByteStreamUpdate(size_t max_size_hint, + size_t have_already) override {} }; class StreamFlowControl final : public StreamFlowControlBase { @@ -344,32 +346,35 @@ class StreamFlowControl final : public StreamFlowControlBase { tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); } - FlowControlAction UpdateAction(FlowControlAction action); - FlowControlAction MakeAction() { return UpdateAction(tfc_->MakeAction()); } + FlowControlAction UpdateAction(FlowControlAction action) override; + FlowControlAction MakeAction() override { + return UpdateAction(tfc_->MakeAction()); + } // we have sent data on the wire, we must track this in our bookkeeping for // the remote peer's flow control. - void SentData(int64_t outgoing_frame_size) { + void SentData(int64_t outgoing_frame_size) override { FlowControlTrace tracer(" data sent", tfc_, this); tfc_->StreamSentData(outgoing_frame_size); remote_window_delta_ -= outgoing_frame_size; } // we have received data from the wire - grpc_error* RecvData(int64_t incoming_frame_size); + grpc_error* RecvData(int64_t incoming_frame_size) override; // returns an announce if we should send a stream update to our peer, else // returns zero - uint32_t MaybeSendUpdate(); + uint32_t MaybeSendUpdate() override; // we have received a WINDOW_UPDATE frame for a stream - void RecvUpdate(uint32_t size) { + void RecvUpdate(uint32_t size) override { FlowControlTrace trace("s updt recv", tfc_, this); remote_window_delta_ += size; } // the application is asking for a certain amount of bytes - void IncomingByteStreamUpdate(size_t max_size_hint, size_t have_already); + void IncomingByteStreamUpdate(size_t max_size_hint, + size_t have_already) override; int64_t remote_window_delta() const { return remote_window_delta_; } int64_t local_window_delta() const { return local_window_delta_; } @@ -377,7 +382,7 @@ class StreamFlowControl final : public StreamFlowControlBase { const grpc_chttp2_stream* stream() const { return s_; } - void TestOnlyForceHugeWindow() { + void TestOnlyForceHugeWindow() override { announced_window_delta_ = 1024 * 1024 * 1024; local_window_delta_ = 1024 * 1024 * 1024; remote_window_delta_ = 1024 * 1024 * 1024; From 2fe86668b43a5a1f74b4ab31a03e47f8fb4ca5c5 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 5 Dec 2017 13:27:17 -0800 Subject: [PATCH 06/16] Fix bm_trickle_compile --- src/core/ext/transport/chttp2/transport/flow_control.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h index 79d2dec9400..fa7f716fbe0 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.h +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -153,6 +153,7 @@ class TransportFlowControlBase { GRPC_ABSTRACT_BASE_CLASS protected: + friend class ::grpc::testing::TrickledCHTTP2; int64_t remote_window_ = kDefaultWindow; int64_t target_initial_window_size_ = kDefaultWindow; int64_t announced_window_ = kDefaultWindow; @@ -258,7 +259,6 @@ class TransportFlowControl final : public TransportFlowControlBase { } private: - friend class ::grpc::testing::TrickledCHTTP2; double TargetLogBdp(); double SmoothLogBdp(grpc_exec_ctx* exec_ctx, double value); FlowControlAction::Urgency DeltaUrgency(int32_t value, @@ -318,6 +318,7 @@ class StreamFlowControlBase { GRPC_ABSTRACT_BASE_CLASS protected: + friend class ::grpc::testing::TrickledCHTTP2; int64_t remote_window_delta_ = 0; int64_t local_window_delta_ = 0; int64_t announced_window_delta_ = 0; @@ -389,7 +390,6 @@ class StreamFlowControl final : public StreamFlowControlBase { } private: - friend class ::grpc::testing::TrickledCHTTP2; TransportFlowControl* const tfc_; const grpc_chttp2_stream* const s_; From 78478d0e9e84e0b95c9f0e914d3f80a76bec6cd2 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 5 Dec 2017 18:05:56 -0800 Subject: [PATCH 07/16] Fix write_buffering_test --- src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 233ea7f6a6f..1375a0ca131 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -343,8 +343,8 @@ static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, window -- this should by rights be 0 */ t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; t->sent_local_settings = 0; - t->write_buffer_size = grpc_core::chttp2::kDefaultWindow; } + t->write_buffer_size = grpc_core::chttp2::kDefaultWindow; if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( From 66a72bba692f5b72af65a4b2a80bf45d93ccdfda Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 5 Dec 2017 18:09:07 -0800 Subject: [PATCH 08/16] Undo debugging output --- test/core/end2end/tests/payload.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/core/end2end/tests/payload.cc b/test/core/end2end/tests/payload.cc index 8c3fd1b668a..2e9513b9cbe 100644 --- a/test/core/end2end/tests/payload.cc +++ b/test/core/end2end/tests/payload.cc @@ -125,7 +125,6 @@ static void request_response_with_payload(grpc_end2end_test_config config, grpc_byte_buffer* response_payload_recv = nullptr; grpc_call_details call_details; grpc_status_code status; - const char* error_string = nullptr; grpc_call_error error; grpc_slice details; int was_cancelled = 2; @@ -173,7 +172,6 @@ static void request_response_with_payload(grpc_end2end_test_config config, op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; - op->data.recv_status_on_client.error_string = &error_string; op->flags = 0; op->reserved = nullptr; op++; @@ -232,7 +230,6 @@ static void request_response_with_payload(grpc_end2end_test_config config, CQ_EXPECT_COMPLETION(cqv, tag(1), 1); cq_verify(cqv); - gpr_log(GPR_ERROR, "%s", error_string); GPR_ASSERT(status == GRPC_STATUS_OK); GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); From 734256a5d09f491459b611361afcb88be0d209a8 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 8 Dec 2017 12:37:16 -0800 Subject: [PATCH 09/16] Add comments --- .../transport/chttp2/transport/flow_control.h | 89 ++++++++++++++++++- 1 file changed, 85 insertions(+), 4 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h index fa7f716fbe0..8e05b0c08c2 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.h +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -44,10 +44,15 @@ namespace grpc_core { namespace chttp2 { static constexpr uint32_t kDefaultWindow = 65535; +static constexpr int64_t kMaxWindow = (int64_t)((1u << 31) - 1); class TransportFlowControl; class StreamFlowControl; + +// Encapsulates a collections of actions the transport needs to take with +// regard to flow control. Each action comes with urgencies that tell the +// transport how quickly the action must take place. class FlowControlAction { public: enum class Urgency : uint8_t { @@ -133,21 +138,53 @@ class FlowControlTrace { int64_t announced_window_delta_; }; +// Fat interface with all methods a flow control implementation needs to +// support. gRPC C Core does not support pure virtual functions, so instead +// we abort in any methods which require implementation in the base class. class TransportFlowControlBase { public: TransportFlowControlBase() {} virtual ~TransportFlowControlBase() {} + + // Is flow control enabled? This is needed in other codepaths like the checks + // in parsing and in writing. + virtual bool flow_control_enabled() const { abort(); } + + // Called to check if the transport needs to send a WINDOW_UPDATE frame virtual uint32_t MaybeSendUpdate(bool writing_anyway) { abort(); } + + // Using the protected members, returns and Action to be taken by the + // tranport. virtual FlowControlAction MakeAction() { abort(); } + + // Using the protected members, returns and Action to be taken by the + // tranport. Also checks for updates to our BDP estimate and acts + // accordingly. virtual FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) { abort(); } + + // Called to do bookkeeping when a stream owned by this transport sends + // data on the wire virtual void StreamSentData(int64_t size) { abort(); } + + // Called to do bookkeeping when a stream owned by this transport receives + // data from the wire. Also does error checking for frame size. virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } + + // Called to do bookkeeping when we receive a WINDOW_UPDATE frame. virtual void RecvUpdate(uint32_t size) { abort(); } - // TODO(ncteisen): maybe completely encapsulate this inside FlowControl + + // Returns the BdpEstimator held by this object. Caller is responsible for + // checking for nullptr. TODO(ncteisen): consider fully encapsulating all + // bdp estimator actions inside TransportFlowControl virtual BdpEstimator* bdp_estimator() { return nullptr; } + + // Getters int64_t remote_window() const { return remote_window_; } virtual int64_t target_window() const { return target_initial_window_size_; } int64_t announced_window() const { return announced_window_; } + + // Used in certain benchmarks in which we don't want FlowControl to be a + // factor virtual void TestOnlyForceHugeWindow() {} GRPC_ABSTRACT_BASE_CLASS @@ -159,15 +196,23 @@ class TransportFlowControlBase { int64_t announced_window_ = kDefaultWindow; }; -const int64_t kMaxWindow = (int64_t)((1u << 31) - 1); - +// Implementation of flow control that does NOTHING. Always returns maximum +// values, never initiates writes, and assumes that the remote peer is doing +// the same. To be used to narrow down on flow control as the cause of negative +// performance. class TransportFlowControlDisabled final : public TransportFlowControlBase { public: + + // Maxes out all values TransportFlowControlDisabled() { remote_window_ = kMaxWindow; target_initial_window_size_ = kMaxWindow; announced_window_ = kMaxWindow; } + + bool flow_control_enabled() const override { return false; } + + // Never do anything. uint32_t MaybeSendUpdate(bool writing_anyway) override { return 0; } FlowControlAction MakeAction() override { return FlowControlAction(); } FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) override { @@ -178,15 +223,18 @@ class TransportFlowControlDisabled final : public TransportFlowControlBase { return GRPC_ERROR_NONE; } void RecvUpdate(uint32_t size) override {} - int64_t target_window() const override { return kMaxWindow; } }; +// Implementation of flow control that abides to HTTP/2 spec and attempts +// to be as performant as possible. class TransportFlowControl final : public TransportFlowControlBase { public: TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t, bool enable_bdp_probe); ~TransportFlowControl() {} + bool flow_control_enabled() const override { return true; } + bool bdp_probe() const { return enable_bdp_probe_; } // returns an announce if we should send a transport update to our peer, @@ -227,6 +275,8 @@ class TransportFlowControl final : public TransportFlowControlBase { remote_window_ += size; } + // See comment above announced_stream_total_over_incoming_window_ for the + // logic behind this decision. int64_t target_window() const override { return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), announced_stream_total_over_incoming_window_ + @@ -296,21 +346,46 @@ class TransportFlowControl final : public TransportFlowControlBase { grpc_millis last_pid_update_ = 0; }; +// Fat interface with all methods a stream flow control implementation needs +// to support. gRPC C Core does not support pure virtual functions, so instead +// we abort in any methods which require implementation in the base class. class StreamFlowControlBase { public: StreamFlowControlBase() {} virtual ~StreamFlowControlBase() {} + + // Updates an action using the protected members. virtual FlowControlAction UpdateAction(FlowControlAction action) { abort(); } + + // Using the protected members, returns an Action for this stream to be + // taken by the tranport. virtual FlowControlAction MakeAction() { abort(); } + + // Bookkeeping for when data is sent on this stream. virtual void SentData(int64_t outgoing_frame_size) { abort(); } + + // Bookkeeping and error checking for when data is received by this stream. virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } + + // Called to check if this stream needs to send a WINDOW_UPDATE frame. virtual uint32_t MaybeSendUpdate() { abort(); } + + // Bookkeeping for receiving a WINDOW_UPDATE from for this stream. virtual void RecvUpdate(uint32_t size) { abort(); } + + // Bookkeeping for when a call pulls bytes out of the transport. At this + // point we consider the data 'used' and can thus let out peer know we are + // ready for more data. virtual void IncomingByteStreamUpdate(size_t max_size_hint, size_t have_already) { abort(); } + + // Used in certain benchmarks in which we don't want FlowControl to be a + // factor virtual void TestOnlyForceHugeWindow() {} + + // Getters int64_t remote_window_delta() { return remote_window_delta_; } int64_t local_window_delta() { return local_window_delta_; } int64_t announced_window_delta() { return announced_window_delta_; } @@ -324,6 +399,10 @@ class StreamFlowControlBase { int64_t announced_window_delta_ = 0; }; +// Implementation of flow control that does NOTHING. Always returns maximum +// values, never initiates writes, and assumes that the remote peer is doing +// the same. To be used to narrow down on flow control as the cause of negative +// performance. class StreamFlowControlDisabled : public StreamFlowControlBase { public: FlowControlAction UpdateAction(FlowControlAction action) override { @@ -340,6 +419,8 @@ class StreamFlowControlDisabled : public StreamFlowControlBase { size_t have_already) override {} }; +// Implementation of flow control that abides to HTTP/2 spec and attempts +// to be as performant as possible. class StreamFlowControl final : public StreamFlowControlBase { public: StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s); From 0aaee995a3202db07ac0c07b1fbfc863824ca13c Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 8 Dec 2017 12:45:55 -0800 Subject: [PATCH 10/16] Plumb through flow_control->flow_conrol_enabled() --- .../chttp2/transport/chttp2_transport.cc | 33 +++++-------------- .../chttp2/transport/flow_control.cc | 19 +++++++++++ .../transport/chttp2/transport/flow_control.h | 22 +++++-------- .../chttp2/transport/frame_settings.cc | 7 ++-- .../ext/transport/chttp2/transport/parsing.cc | 2 +- .../chttp2/transport/stream_lists.cc | 4 +-- 6 files changed, 43 insertions(+), 44 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 1375a0ca131..768cee11906 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -338,12 +338,10 @@ static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, } } t->dirtied_local_settings = 1; - if (!true /*diable flow control */) { - /* Hack: it's common for implementations to assume 65536 bytes initial send - window -- this should by rights be 0 */ - t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; - t->sent_local_settings = 0; - } + /* Hack: it's common for implementations to assume 65536 bytes initial send + window -- this should by rights be 0 */ + t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; + t->sent_local_settings = 0; t->write_buffer_size = grpc_core::chttp2::kDefaultWindow; if (is_client) { @@ -546,23 +544,8 @@ static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, } } - // Tune the heck out of this - const uint32_t kFrameSize = 1024 * 1024; - if (true /* disable flow control*/) { - t->flow_control.Init(); - t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = - kFrameSize; - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = - kFrameSize; - t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = - kFrameSize; - t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = - grpc_core::chttp2::kMaxWindow; - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = - grpc_core::chttp2::kMaxWindow; - t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = - grpc_core::chttp2::kMaxWindow; + t->flow_control.Init(t); enable_bdp = false; } else { t->flow_control.Init(exec_ctx, t, @@ -739,13 +722,13 @@ static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, post_destructive_reclaimer(exec_ctx, t); } - if (true /* disable flow control */) { - s->flow_control.Init(); - } else { + if (t->flow_control->flow_control_enabled()) { s->flow_control.Init( static_cast( t->flow_control.get()), s); + } else { + s->flow_control.Init(); } GPR_TIMER_END("init_stream", 0); diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index 8a057bd9ff4..67601d1530a 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -149,6 +149,25 @@ void FlowControlAction::Trace(grpc_chttp2_transport* t) const { gpr_free(mf_str); } +TransportFlowControlDisabled::TransportFlowControlDisabled( + grpc_chttp2_transport* t) { + remote_window_ = kMaxWindow; + target_initial_window_size_ = kMaxWindow; + announced_window_ = kMaxWindow; + t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = + kFrameSize; + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = + kFrameSize; + t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = + kFrameSize; + t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = + kMaxWindow; + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = + kMaxWindow; + t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = + kMaxWindow; +} + TransportFlowControl::TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t, bool enable_bdp_probe) diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h index 8e05b0c08c2..750b7556230 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.h +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -45,11 +45,12 @@ namespace chttp2 { static constexpr uint32_t kDefaultWindow = 65535; static constexpr int64_t kMaxWindow = (int64_t)((1u << 31) - 1); +// TODO(ncteisen): Tune this +static constexpr uint32_t kFrameSize = 1024 * 1024; class TransportFlowControl; class StreamFlowControl; - // Encapsulates a collections of actions the transport needs to take with // regard to flow control. Each action comes with urgencies that tell the // transport how quickly the action must take place. @@ -153,12 +154,12 @@ class TransportFlowControlBase { // Called to check if the transport needs to send a WINDOW_UPDATE frame virtual uint32_t MaybeSendUpdate(bool writing_anyway) { abort(); } - // Using the protected members, returns and Action to be taken by the + // Using the protected members, returns and Action to be taken by the // tranport. virtual FlowControlAction MakeAction() { abort(); } - // Using the protected members, returns and Action to be taken by the - // tranport. Also checks for updates to our BDP estimate and acts + // Using the protected members, returns and Action to be taken by the + // tranport. Also checks for updates to our BDP estimate and acts // accordingly. virtual FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) { abort(); } @@ -166,7 +167,7 @@ class TransportFlowControlBase { // data on the wire virtual void StreamSentData(int64_t size) { abort(); } - // Called to do bookkeeping when a stream owned by this transport receives + // Called to do bookkeeping when a stream owned by this transport receives // data from the wire. Also does error checking for frame size. virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } @@ -202,13 +203,8 @@ class TransportFlowControlBase { // performance. class TransportFlowControlDisabled final : public TransportFlowControlBase { public: - // Maxes out all values - TransportFlowControlDisabled() { - remote_window_ = kMaxWindow; - target_initial_window_size_ = kMaxWindow; - announced_window_ = kMaxWindow; - } + TransportFlowControlDisabled(grpc_chttp2_transport* t); bool flow_control_enabled() const override { return false; } @@ -275,7 +271,7 @@ class TransportFlowControl final : public TransportFlowControlBase { remote_window_ += size; } - // See comment above announced_stream_total_over_incoming_window_ for the + // See comment above announced_stream_total_over_incoming_window_ for the // logic behind this decision. int64_t target_window() const override { return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), @@ -357,7 +353,7 @@ class StreamFlowControlBase { // Updates an action using the protected members. virtual FlowControlAction UpdateAction(FlowControlAction action) { abort(); } - // Using the protected members, returns an Action for this stream to be + // Using the protected members, returns an Action for this stream to be // taken by the tranport. virtual FlowControlAction MakeAction() { abort(); } diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc index 1fd3a66463a..0e40213a1c2 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.cc +++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc @@ -187,9 +187,10 @@ grpc_error* grpc_chttp2_settings_parser_parse(grpc_exec_ctx* exec_ctx, void* p, if (grpc_wire_id_to_setting_id(parser->id, &id)) { const grpc_chttp2_setting_parameters* sp = &grpc_chttp2_settings_parameters[id]; - if ((id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE || - id == GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE) && - true /* disable flow conrtol */) { + // If flow control is disabled we skip these. + if (!t->flow_control->flow_control_enabled() && + (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE || + id == GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE)) { continue; } if (parser->value < sp->min_value || parser->value > sp->max_value) { diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index dce6aa42276..f2a7e95fc60 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -197,7 +197,7 @@ grpc_error* grpc_chttp2_perform_read(grpc_exec_ctx* exec_ctx, return GRPC_ERROR_NONE; } goto dts_fh_0; /* loop */ - } else if (!true /*disable flow control*/ && + } else if (t->flow_control->flow_control_enabled() && t->incoming_frame_size > t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) { diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.cc b/src/core/ext/transport/chttp2/transport/stream_lists.cc index c6e2bada337..3aad8c5823c 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.cc +++ b/src/core/ext/transport/chttp2/transport/stream_lists.cc @@ -183,7 +183,7 @@ void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t, void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t, grpc_chttp2_stream* s) { - GPR_ASSERT(!true /*flow control disabled */); + GPR_ASSERT(t->flow_control->flow_control_enabled()); stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); } @@ -199,7 +199,7 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t, void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s) { - GPR_ASSERT(!true /*flow control disabled */); + GPR_ASSERT(t->flow_control->flow_control_enabled()); stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); } From 228089f7b87ac74e17c6d4470cb2ebf0371a0844 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Mon, 23 Oct 2017 16:08:40 -0700 Subject: [PATCH 11/16] Add new env var --- doc/environment_variables.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/doc/environment_variables.md b/doc/environment_variables.md index 40af758f693..4796ad067d8 100644 --- a/doc/environment_variables.md +++ b/doc/environment_variables.md @@ -127,3 +127,8 @@ some configuration as environment variables that can be set. there is no active polling thread. They help reconnect disconnected client channels (mostly due to idleness), so that the next RPC on this channel won't fail. Set to 0 to turn off the backup polls. + +* GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL + if set, flow control will be effectively disabled. Max out all values and + assume the remote peer does the same. Thus we can ignore any flow control + bookkeeping, error checking, and decision making From a77fb7dc6792a48edc30100482032844c56feedc Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 8 Dec 2017 12:57:26 -0800 Subject: [PATCH 12/16] Read new env var to toggle --- .../chttp2/transport/chttp2_transport.cc | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 768cee11906..f300ae91885 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -261,6 +261,9 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } static const grpc_transport_vtable* get_vtable(void); +// -1 == unset, 0 == disabled, 1 == enabled +static int flow_control_enabled = -1; + static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) { @@ -544,12 +547,19 @@ static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, } } - if (true /* disable flow control*/) { - t->flow_control.Init(t); - enable_bdp = false; - } else { + if (flow_control_enabled == -1) { + char* env_variable = gpr_getenv("GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL"); + if (env_variable != nullptr) flow_control_enabled = 0; + else flow_control_enabled = 1; + gpr_free(env_variable); + } + + if (flow_control_enabled) { t->flow_control.Init(exec_ctx, t, enable_bdp); + } else { + t->flow_control.Init(t); + enable_bdp = false; } /* No pings allowed before receiving a header or data frame. */ From d230ad086a894cc963235b27814e19bf686eb7aa Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 2 Jan 2018 11:43:53 -0800 Subject: [PATCH 13/16] clang fmt --- src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index caf01c7e4a6..c13c4056c11 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -517,10 +517,9 @@ static void init_transport(grpc_chttp2_transport* t, if (flow_control_enabled == -1) { char* env_variable = gpr_getenv("GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL"); - if (env_variable != nullptr){ + if (env_variable != nullptr) { flow_control_enabled = 0; - } - else { + } else { flow_control_enabled = 1; } gpr_free(env_variable); From 3c747f1bd8294aa11c2d9d5aa17099441c193107 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Wed, 3 Jan 2018 14:24:49 -0800 Subject: [PATCH 14/16] Fix tsan --- .../ext/transport/chttp2/transport/chttp2_transport.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 5ee1f08e614..97f79d9bdf3 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -233,7 +233,7 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } static const grpc_transport_vtable* get_vtable(void); // -1 == unset, 0 == disabled, 1 == enabled -static int flow_control_enabled = -1; +static gpr_atm flow_control_enabled = -1; static void init_transport(grpc_chttp2_transport* t, const grpc_channel_args* channel_args, @@ -520,17 +520,17 @@ static void init_transport(grpc_chttp2_transport* t, } } - if (flow_control_enabled == -1) { + if (gpr_atm_no_barrier_load(&flow_control_enabled) == -1) { char* env_variable = gpr_getenv("GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL"); if (env_variable != nullptr) { - flow_control_enabled = 0; + gpr_atm_no_barrier_store(&flow_control_enabled, 0); } else { - flow_control_enabled = 1; + gpr_atm_no_barrier_store(&flow_control_enabled, 1); } gpr_free(env_variable); } - if (flow_control_enabled) { + if (gpr_atm_no_barrier_load(&flow_control_enabled)) { t->flow_control.Init(t, enable_bdp); } else { From bddffb24b4158aeeab9814822491c05c5af29167 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Wed, 3 Jan 2018 22:37:29 -0800 Subject: [PATCH 15/16] Fix objc compile --- src/core/ext/transport/chttp2/transport/flow_control.cc | 6 +++--- src/core/ext/transport/chttp2/transport/flow_control.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index b4864077bdd..3013db23f70 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -337,7 +337,7 @@ double TransportFlowControl::SmoothLogBdp(double value) { } FlowControlAction::Urgency TransportFlowControl::DeltaUrgency( - int32_t value, grpc_chttp2_setting_id setting_id) { + int64_t value, grpc_chttp2_setting_id setting_id) { int64_t delta = (int64_t)value - (int64_t)t_->settings[GRPC_LOCAL_SETTINGS][setting_id]; // TODO(ncteisen): tune this @@ -363,7 +363,7 @@ FlowControlAction TransportFlowControl::PeriodicUpdate() { action.set_send_initial_window_update( DeltaUrgency(target_initial_window_size_, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE), - target_initial_window_size_); + (uint32_t)target_initial_window_size_); // get bandwidth estimate and update max_frame accordingly. double bw_dbl = bdp_estimator_.EstimateBandwidth(); @@ -373,7 +373,7 @@ FlowControlAction TransportFlowControl::PeriodicUpdate() { target_initial_window_size_), 16384, 16777215); action.set_send_max_frame_size_update( - DeltaUrgency(frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE), + DeltaUrgency((int64_t)frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE), frame_size); } return UpdateAction(action); diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h index 789f755c386..38b9f50c8d4 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.h +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -304,7 +304,7 @@ class TransportFlowControl final : public TransportFlowControlBase { private: double TargetLogBdp(); double SmoothLogBdp(double value); - FlowControlAction::Urgency DeltaUrgency(int32_t value, + FlowControlAction::Urgency DeltaUrgency(int64_t value, grpc_chttp2_setting_id setting_id); FlowControlAction UpdateAction(FlowControlAction action) { From 157815d5cd5fc257d568bcfa87baa18325608000 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Wed, 10 Jan 2018 17:58:46 -0800 Subject: [PATCH 16/16] Reviewer feedback --- .../chttp2/transport/chttp2_plugin.cc | 10 +++++++++- .../chttp2/transport/chttp2_transport.cc | 19 +++++-------------- .../chttp2/transport/chttp2_transport.h | 2 ++ 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.cc b/src/core/ext/transport/chttp2/transport/chttp2_plugin.cc index 97c1878f340..3aca61fdac7 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.cc @@ -18,8 +18,16 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/support/env.h" #include "src/core/lib/transport/metadata.h" -void grpc_chttp2_plugin_init(void) {} +void grpc_chttp2_plugin_init(void) { + g_flow_control_enabled = true; + char* env_variable = gpr_getenv("GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL"); + if (env_variable != nullptr) { + g_flow_control_enabled = false; + gpr_free(env_variable); + } +} void grpc_chttp2_plugin_shutdown(void) {} diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 97f79d9bdf3..835de6aa0f4 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -152,6 +152,10 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error); static void reset_byte_stream(void* arg, grpc_error* error); +// Flow control default enabled. Can be disabled by setting +// GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL +bool g_flow_control_enabled = true; + /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -232,9 +236,6 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } static const grpc_transport_vtable* get_vtable(void); -// -1 == unset, 0 == disabled, 1 == enabled -static gpr_atm flow_control_enabled = -1; - static void init_transport(grpc_chttp2_transport* t, const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) { @@ -520,17 +521,7 @@ static void init_transport(grpc_chttp2_transport* t, } } - if (gpr_atm_no_barrier_load(&flow_control_enabled) == -1) { - char* env_variable = gpr_getenv("GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL"); - if (env_variable != nullptr) { - gpr_atm_no_barrier_store(&flow_control_enabled, 0); - } else { - gpr_atm_no_barrier_store(&flow_control_enabled, 1); - } - gpr_free(env_variable); - } - - if (gpr_atm_no_barrier_load(&flow_control_enabled)) { + if (g_flow_control_enabled) { t->flow_control.Init(t, enable_bdp); } else { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index 596ababb194..34519ceec9c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -27,6 +27,8 @@ extern grpc_core::TraceFlag grpc_http_trace; extern grpc_core::TraceFlag grpc_trace_http2_stream_state; extern grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount; +extern bool g_flow_control_enabled; + grpc_transport* grpc_create_chttp2_transport( const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client);