Plumb through flow_control->flow_conrol_enabled()

pull/13289/head
ncteisen 7 years ago
parent 734256a5d0
commit 0aaee995a3
  1. 33
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 19
      src/core/ext/transport/chttp2/transport/flow_control.cc
  3. 22
      src/core/ext/transport/chttp2/transport/flow_control.h
  4. 7
      src/core/ext/transport/chttp2/transport/frame_settings.cc
  5. 2
      src/core/ext/transport/chttp2/transport/parsing.cc
  6. 4
      src/core/ext/transport/chttp2/transport/stream_lists.cc

@ -338,12 +338,10 @@ static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
}
}
t->dirtied_local_settings = 1;
if (!true /*diable flow control */) {
/* Hack: it's common for implementations to assume 65536 bytes initial send
window -- this should by rights be 0 */
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
}
/* Hack: it's common for implementations to assume 65536 bytes initial send
window -- this should by rights be 0 */
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
t->write_buffer_size = grpc_core::chttp2::kDefaultWindow;
if (is_client) {
@ -546,23 +544,8 @@ static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
}
}
// Tune the heck out of this
const uint32_t kFrameSize = 1024 * 1024;
if (true /* disable flow control*/) {
t->flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>();
t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
kFrameSize;
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
kFrameSize;
t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
kFrameSize;
t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
grpc_core::chttp2::kMaxWindow;
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
grpc_core::chttp2::kMaxWindow;
t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
grpc_core::chttp2::kMaxWindow;
t->flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(t);
enable_bdp = false;
} else {
t->flow_control.Init<grpc_core::chttp2::TransportFlowControl>(exec_ctx, t,
@ -739,13 +722,13 @@ static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
post_destructive_reclaimer(exec_ctx, t);
}
if (true /* disable flow control */) {
s->flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
} else {
if (t->flow_control->flow_control_enabled()) {
s->flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
static_cast<grpc_core::chttp2::TransportFlowControl*>(
t->flow_control.get()),
s);
} else {
s->flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
}
GPR_TIMER_END("init_stream", 0);

@ -149,6 +149,25 @@ void FlowControlAction::Trace(grpc_chttp2_transport* t) const {
gpr_free(mf_str);
}
TransportFlowControlDisabled::TransportFlowControlDisabled(
grpc_chttp2_transport* t) {
remote_window_ = kMaxWindow;
target_initial_window_size_ = kMaxWindow;
announced_window_ = kMaxWindow;
t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
kFrameSize;
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
kFrameSize;
t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
kFrameSize;
t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
kMaxWindow;
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
kMaxWindow;
t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
kMaxWindow;
}
TransportFlowControl::TransportFlowControl(grpc_exec_ctx* exec_ctx,
const grpc_chttp2_transport* t,
bool enable_bdp_probe)

@ -45,11 +45,12 @@ namespace chttp2 {
static constexpr uint32_t kDefaultWindow = 65535;
static constexpr int64_t kMaxWindow = (int64_t)((1u << 31) - 1);
// TODO(ncteisen): Tune this
static constexpr uint32_t kFrameSize = 1024 * 1024;
class TransportFlowControl;
class StreamFlowControl;
// Encapsulates a collections of actions the transport needs to take with
// regard to flow control. Each action comes with urgencies that tell the
// transport how quickly the action must take place.
@ -153,12 +154,12 @@ class TransportFlowControlBase {
// Called to check if the transport needs to send a WINDOW_UPDATE frame
virtual uint32_t MaybeSendUpdate(bool writing_anyway) { abort(); }
// Using the protected members, returns and Action to be taken by the
// Using the protected members, returns and Action to be taken by the
// tranport.
virtual FlowControlAction MakeAction() { abort(); }
// Using the protected members, returns and Action to be taken by the
// tranport. Also checks for updates to our BDP estimate and acts
// Using the protected members, returns and Action to be taken by the
// tranport. Also checks for updates to our BDP estimate and acts
// accordingly.
virtual FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) { abort(); }
@ -166,7 +167,7 @@ class TransportFlowControlBase {
// data on the wire
virtual void StreamSentData(int64_t size) { abort(); }
// Called to do bookkeeping when a stream owned by this transport receives
// Called to do bookkeeping when a stream owned by this transport receives
// data from the wire. Also does error checking for frame size.
virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); }
@ -202,13 +203,8 @@ class TransportFlowControlBase {
// performance.
class TransportFlowControlDisabled final : public TransportFlowControlBase {
public:
// Maxes out all values
TransportFlowControlDisabled() {
remote_window_ = kMaxWindow;
target_initial_window_size_ = kMaxWindow;
announced_window_ = kMaxWindow;
}
TransportFlowControlDisabled(grpc_chttp2_transport* t);
bool flow_control_enabled() const override { return false; }
@ -275,7 +271,7 @@ class TransportFlowControl final : public TransportFlowControlBase {
remote_window_ += size;
}
// See comment above announced_stream_total_over_incoming_window_ for the
// See comment above announced_stream_total_over_incoming_window_ for the
// logic behind this decision.
int64_t target_window() const override {
return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1),
@ -357,7 +353,7 @@ class StreamFlowControlBase {
// Updates an action using the protected members.
virtual FlowControlAction UpdateAction(FlowControlAction action) { abort(); }
// Using the protected members, returns an Action for this stream to be
// Using the protected members, returns an Action for this stream to be
// taken by the tranport.
virtual FlowControlAction MakeAction() { abort(); }

@ -187,9 +187,10 @@ grpc_error* grpc_chttp2_settings_parser_parse(grpc_exec_ctx* exec_ctx, void* p,
if (grpc_wire_id_to_setting_id(parser->id, &id)) {
const grpc_chttp2_setting_parameters* sp =
&grpc_chttp2_settings_parameters[id];
if ((id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ||
id == GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE) &&
true /* disable flow conrtol */) {
// If flow control is disabled we skip these.
if (!t->flow_control->flow_control_enabled() &&
(id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ||
id == GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE)) {
continue;
}
if (parser->value < sp->min_value || parser->value > sp->max_value) {

@ -197,7 +197,7 @@ grpc_error* grpc_chttp2_perform_read(grpc_exec_ctx* exec_ctx,
return GRPC_ERROR_NONE;
}
goto dts_fh_0; /* loop */
} else if (!true /*disable flow control*/ &&
} else if (t->flow_control->flow_control_enabled() &&
t->incoming_frame_size >
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) {

@ -183,7 +183,7 @@ void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
GPR_ASSERT(!true /*flow control disabled */);
GPR_ASSERT(t->flow_control->flow_control_enabled());
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
@ -199,7 +199,7 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
GPR_ASSERT(!true /*flow control disabled */);
GPR_ASSERT(t->flow_control->flow_control_enabled());
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
}

Loading…
Cancel
Save