First pass at abstract base flow control class

pull/13289/head
ncteisen 7 years ago
parent 96311af518
commit 04d8bc2c9b
  1. 8
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 95
      src/core/ext/transport/chttp2/transport/flow_control.h
  3. 8
      src/core/ext/transport/chttp2/transport/internal.h

@ -544,7 +544,8 @@ static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
}
}
t->flow_control.Init(exec_ctx, t, enable_bdp);
t->flow_control.Init<grpc_core::chttp2::TransportFlowControl>(exec_ctx, t,
enable_bdp);
/* No pings allowed before receiving a header or data frame. */
t->ping_state.pings_before_data_required = 0;
@ -716,7 +717,10 @@ static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
post_destructive_reclaimer(exec_ctx, t);
}
s->flow_control.Init(t->flow_control.get(), s);
s->flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
static_cast<grpc_core::chttp2::TransportFlowControl *>(
t->flow_control.get()),
s);
GPR_TIMER_END("init_stream", 0);
return 0;

@ -24,6 +24,7 @@
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/lib/support/abstract.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/pid_controller.h"
@ -132,7 +133,30 @@ class FlowControlTrace {
int64_t announced_window_delta_;
};
class TransportFlowControl {
class TransportFlowControlBase {
public:
TransportFlowControlBase() {}
virtual ~TransportFlowControlBase() {}
virtual uint32_t MaybeSendUpdate(bool writing_anyway) {abort();}
virtual FlowControlAction MakeAction() {abort();}
virtual FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) {abort();}
virtual void StreamSentData(int64_t size) {abort();}
virtual grpc_error* RecvData(int64_t incoming_frame_size) {abort();}
virtual void RecvUpdate(uint32_t size) {abort();}
virtual BdpEstimator* bdp_estimator() { return nullptr; }
int64_t remote_window() const { return remote_window_; }
virtual int64_t target_window() const { return target_initial_window_size_; }
int64_t announced_window() const{ return announced_window_; }
GRPC_ABSTRACT_BASE_CLASS
protected:
int64_t remote_window_ = kDefaultWindow;
int64_t target_initial_window_size_ = kDefaultWindow;
int64_t announced_window_ = kDefaultWindow;
};
class TransportFlowControl final : public TransportFlowControlBase {
public:
TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t,
bool enable_bdp_probe);
@ -144,25 +168,25 @@ class TransportFlowControl {
// else returns zero; writing_anyway indicates if a write would happen
// regardless of the send - if it is false and this function returns non-zero,
// this announce will cause a write to occur
uint32_t MaybeSendUpdate(bool writing_anyway);
uint32_t MaybeSendUpdate(bool writing_anyway) override;
// Reads the flow control data and returns and actionable struct that will
// tell chttp2 exactly what it needs to do
FlowControlAction MakeAction() { return UpdateAction(FlowControlAction()); }
FlowControlAction MakeAction() override { return UpdateAction(FlowControlAction()); }
// Call periodically (at a low-ish rate, 100ms - 10s makes sense)
// to perform more complex flow control calculations and return an action
// to let chttp2 change its parameters
FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx);
FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) override;
void StreamSentData(int64_t size) { remote_window_ -= size; }
void StreamSentData(int64_t size) override { remote_window_ -= size; }
grpc_error* ValidateRecvData(int64_t incoming_frame_size);
void CommitRecvData(int64_t incoming_frame_size) {
announced_window_ -= incoming_frame_size;
}
grpc_error* RecvData(int64_t incoming_frame_size) {
grpc_error* RecvData(int64_t incoming_frame_size) override {
FlowControlTrace trace(" data recv", this, nullptr);
grpc_error* error = ValidateRecvData(incoming_frame_size);
if (error != GRPC_ERROR_NONE) return error;
@ -171,18 +195,16 @@ class TransportFlowControl {
}
// we have received a WINDOW_UPDATE frame for a transport
void RecvUpdate(uint32_t size) {
void RecvUpdate(uint32_t size) override {
FlowControlTrace trace("t updt recv", this, nullptr);
remote_window_ += size;
}
int64_t remote_window() const { return remote_window_; }
int64_t target_window() const {
int64_t target_window() const override {
return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1),
announced_stream_total_over_incoming_window_ +
target_initial_window_size_);
}
int64_t announced_window() const { return announced_window_; }
const grpc_chttp2_transport* transport() const { return t_; }
@ -202,7 +224,7 @@ class TransportFlowControl {
}
}
BdpEstimator* bdp_estimator() { return &bdp_estimator_; }
BdpEstimator* bdp_estimator() override { return &bdp_estimator_; }
void TestOnlyForceHugeWindow() {
announced_window_ = 1024 * 1024 * 1024;
@ -226,9 +248,6 @@ class TransportFlowControl {
const grpc_chttp2_transport* const t_;
/** Our bookkeeping for the remote peer's available window */
int64_t remote_window_ = kDefaultWindow;
/** calculating what we should give for local window:
we track the total amount of flow control over initial window size
across all streams: this is data that we want to receive right now (it
@ -240,13 +259,6 @@ class TransportFlowControl {
int64_t announced_stream_total_over_incoming_window_ = 0;
int64_t announced_stream_total_under_incoming_window_ = 0;
/** This is out window according to what we have sent to our remote peer. The
* difference between this and target window is what we use to decide when
* to send WINDOW_UPDATE frames. */
int64_t announced_window_ = kDefaultWindow;
int32_t target_initial_window_size_ = kDefaultWindow;
/** should we probe bdp? */
const bool enable_bdp_probe_;
@ -258,7 +270,31 @@ class TransportFlowControl {
grpc_millis last_pid_update_ = 0;
};
class StreamFlowControl {
class StreamFlowControlBase {
public:
StreamFlowControlBase() {}
virtual ~StreamFlowControlBase() {}
virtual FlowControlAction UpdateAction(FlowControlAction action) {abort();}
virtual FlowControlAction MakeAction() {abort();}
virtual void SentData(int64_t outgoing_frame_size) {abort();}
virtual grpc_error* RecvData(int64_t incoming_frame_size) {abort();}
virtual uint32_t MaybeSendUpdate() {abort();}
virtual void RecvUpdate(uint32_t size) {abort();}
virtual void IncomingByteStreamUpdate(size_t max_size_hint,
size_t have_already) {abort();}
int64_t remote_window_delta() { return remote_window_delta_; }
int64_t local_window_delta() { return local_window_delta_; }
int64_t announced_window_delta() { return announced_window_delta_; }
GRPC_ABSTRACT_BASE_CLASS
protected:
int64_t remote_window_delta_ = 0;
int64_t local_window_delta_ = 0;
int64_t announced_window_delta_ = 0;
};
class StreamFlowControl final : public StreamFlowControlBase {
public:
StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
~StreamFlowControl() {
@ -314,21 +350,6 @@ class StreamFlowControl {
announced_window_delta_ += change;
tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
}
/** window available for us to send to peer, over or under the initial
* window
* size of the transport... ie:
* remote_window = remote_window_delta + transport.initial_window_size */
int64_t remote_window_delta_ = 0;
/** window available for peer to send to us (as a delta on
* transport.initial_window_size)
* local_window = local_window_delta + transport.initial_window_size */
int64_t local_window_delta_ = 0;
/** window available for peer to send to us over this stream that we have
* announced to the peer */
int64_t announced_window_delta_ = 0;
};
} // namespace chttp2

@ -351,7 +351,9 @@ struct grpc_chttp2_transport {
/** parser for goaway frames */
grpc_chttp2_goaway_parser goaway_parser;
grpc_core::ManualConstructor<grpc_core::chttp2::TransportFlowControl>
grpc_core::PolymorphicManualConstructor<
grpc_core::chttp2::TransportFlowControlBase,
grpc_core::chttp2::TransportFlowControl>
flow_control;
/** initial window change. This is tracked as we parse settings frames from
* the remote peer. If there is a positive delta, then we will make all
@ -526,7 +528,9 @@ struct grpc_chttp2_stream {
bool sent_initial_metadata;
bool sent_trailing_metadata;
grpc_core::ManualConstructor<grpc_core::chttp2::StreamFlowControl>
grpc_core::PolymorphicManualConstructor<
grpc_core::chttp2::StreamFlowControlBase,
grpc_core::chttp2::StreamFlowControl>
flow_control;
grpc_slice_buffer flow_controlled_buffer;

Loading…
Cancel
Save