|
|
@ -44,10 +44,15 @@ namespace grpc_core { |
|
|
|
namespace chttp2 { |
|
|
|
namespace chttp2 { |
|
|
|
|
|
|
|
|
|
|
|
static constexpr uint32_t kDefaultWindow = 65535; |
|
|
|
static constexpr uint32_t kDefaultWindow = 65535; |
|
|
|
|
|
|
|
static constexpr int64_t kMaxWindow = (int64_t)((1u << 31) - 1); |
|
|
|
|
|
|
|
|
|
|
|
class TransportFlowControl; |
|
|
|
class TransportFlowControl; |
|
|
|
class StreamFlowControl; |
|
|
|
class StreamFlowControl; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Encapsulates a collections of actions the transport needs to take with
|
|
|
|
|
|
|
|
// regard to flow control. Each action comes with urgencies that tell the
|
|
|
|
|
|
|
|
// transport how quickly the action must take place.
|
|
|
|
class FlowControlAction { |
|
|
|
class FlowControlAction { |
|
|
|
public: |
|
|
|
public: |
|
|
|
enum class Urgency : uint8_t { |
|
|
|
enum class Urgency : uint8_t { |
|
|
@ -133,21 +138,53 @@ class FlowControlTrace { |
|
|
|
int64_t announced_window_delta_; |
|
|
|
int64_t announced_window_delta_; |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Fat interface with all methods a flow control implementation needs to
|
|
|
|
|
|
|
|
// support. gRPC C Core does not support pure virtual functions, so instead
|
|
|
|
|
|
|
|
// we abort in any methods which require implementation in the base class.
|
|
|
|
class TransportFlowControlBase { |
|
|
|
class TransportFlowControlBase { |
|
|
|
public: |
|
|
|
public: |
|
|
|
TransportFlowControlBase() {} |
|
|
|
TransportFlowControlBase() {} |
|
|
|
virtual ~TransportFlowControlBase() {} |
|
|
|
virtual ~TransportFlowControlBase() {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Is flow control enabled? This is needed in other codepaths like the checks
|
|
|
|
|
|
|
|
// in parsing and in writing.
|
|
|
|
|
|
|
|
virtual bool flow_control_enabled() const { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Called to check if the transport needs to send a WINDOW_UPDATE frame
|
|
|
|
virtual uint32_t MaybeSendUpdate(bool writing_anyway) { abort(); } |
|
|
|
virtual uint32_t MaybeSendUpdate(bool writing_anyway) { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Using the protected members, returns and Action to be taken by the
|
|
|
|
|
|
|
|
// tranport.
|
|
|
|
virtual FlowControlAction MakeAction() { abort(); } |
|
|
|
virtual FlowControlAction MakeAction() { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Using the protected members, returns and Action to be taken by the
|
|
|
|
|
|
|
|
// tranport. Also checks for updates to our BDP estimate and acts
|
|
|
|
|
|
|
|
// accordingly.
|
|
|
|
virtual FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) { abort(); } |
|
|
|
virtual FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Called to do bookkeeping when a stream owned by this transport sends
|
|
|
|
|
|
|
|
// data on the wire
|
|
|
|
virtual void StreamSentData(int64_t size) { abort(); } |
|
|
|
virtual void StreamSentData(int64_t size) { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Called to do bookkeeping when a stream owned by this transport receives
|
|
|
|
|
|
|
|
// data from the wire. Also does error checking for frame size.
|
|
|
|
virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } |
|
|
|
virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Called to do bookkeeping when we receive a WINDOW_UPDATE frame.
|
|
|
|
virtual void RecvUpdate(uint32_t size) { abort(); } |
|
|
|
virtual void RecvUpdate(uint32_t size) { abort(); } |
|
|
|
// TODO(ncteisen): maybe completely encapsulate this inside FlowControl
|
|
|
|
|
|
|
|
|
|
|
|
// Returns the BdpEstimator held by this object. Caller is responsible for
|
|
|
|
|
|
|
|
// checking for nullptr. TODO(ncteisen): consider fully encapsulating all
|
|
|
|
|
|
|
|
// bdp estimator actions inside TransportFlowControl
|
|
|
|
virtual BdpEstimator* bdp_estimator() { return nullptr; } |
|
|
|
virtual BdpEstimator* bdp_estimator() { return nullptr; } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Getters
|
|
|
|
int64_t remote_window() const { return remote_window_; } |
|
|
|
int64_t remote_window() const { return remote_window_; } |
|
|
|
virtual int64_t target_window() const { return target_initial_window_size_; } |
|
|
|
virtual int64_t target_window() const { return target_initial_window_size_; } |
|
|
|
int64_t announced_window() const { return announced_window_; } |
|
|
|
int64_t announced_window() const { return announced_window_; } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Used in certain benchmarks in which we don't want FlowControl to be a
|
|
|
|
|
|
|
|
// factor
|
|
|
|
virtual void TestOnlyForceHugeWindow() {} |
|
|
|
virtual void TestOnlyForceHugeWindow() {} |
|
|
|
|
|
|
|
|
|
|
|
GRPC_ABSTRACT_BASE_CLASS |
|
|
|
GRPC_ABSTRACT_BASE_CLASS |
|
|
@ -159,15 +196,23 @@ class TransportFlowControlBase { |
|
|
|
int64_t announced_window_ = kDefaultWindow; |
|
|
|
int64_t announced_window_ = kDefaultWindow; |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
const int64_t kMaxWindow = (int64_t)((1u << 31) - 1); |
|
|
|
// Implementation of flow control that does NOTHING. Always returns maximum
|
|
|
|
|
|
|
|
// values, never initiates writes, and assumes that the remote peer is doing
|
|
|
|
|
|
|
|
// the same. To be used to narrow down on flow control as the cause of negative
|
|
|
|
|
|
|
|
// performance.
|
|
|
|
class TransportFlowControlDisabled final : public TransportFlowControlBase { |
|
|
|
class TransportFlowControlDisabled final : public TransportFlowControlBase { |
|
|
|
public: |
|
|
|
public: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Maxes out all values
|
|
|
|
TransportFlowControlDisabled() { |
|
|
|
TransportFlowControlDisabled() { |
|
|
|
remote_window_ = kMaxWindow; |
|
|
|
remote_window_ = kMaxWindow; |
|
|
|
target_initial_window_size_ = kMaxWindow; |
|
|
|
target_initial_window_size_ = kMaxWindow; |
|
|
|
announced_window_ = kMaxWindow; |
|
|
|
announced_window_ = kMaxWindow; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool flow_control_enabled() const override { return false; } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Never do anything.
|
|
|
|
uint32_t MaybeSendUpdate(bool writing_anyway) override { return 0; } |
|
|
|
uint32_t MaybeSendUpdate(bool writing_anyway) override { return 0; } |
|
|
|
FlowControlAction MakeAction() override { return FlowControlAction(); } |
|
|
|
FlowControlAction MakeAction() override { return FlowControlAction(); } |
|
|
|
FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) override { |
|
|
|
FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx) override { |
|
|
@ -178,15 +223,18 @@ class TransportFlowControlDisabled final : public TransportFlowControlBase { |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
} |
|
|
|
} |
|
|
|
void RecvUpdate(uint32_t size) override {} |
|
|
|
void RecvUpdate(uint32_t size) override {} |
|
|
|
int64_t target_window() const override { return kMaxWindow; } |
|
|
|
|
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Implementation of flow control that abides to HTTP/2 spec and attempts
|
|
|
|
|
|
|
|
// to be as performant as possible.
|
|
|
|
class TransportFlowControl final : public TransportFlowControlBase { |
|
|
|
class TransportFlowControl final : public TransportFlowControlBase { |
|
|
|
public: |
|
|
|
public: |
|
|
|
TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t, |
|
|
|
TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t, |
|
|
|
bool enable_bdp_probe); |
|
|
|
bool enable_bdp_probe); |
|
|
|
~TransportFlowControl() {} |
|
|
|
~TransportFlowControl() {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool flow_control_enabled() const override { return true; } |
|
|
|
|
|
|
|
|
|
|
|
bool bdp_probe() const { return enable_bdp_probe_; } |
|
|
|
bool bdp_probe() const { return enable_bdp_probe_; } |
|
|
|
|
|
|
|
|
|
|
|
// returns an announce if we should send a transport update to our peer,
|
|
|
|
// returns an announce if we should send a transport update to our peer,
|
|
|
@ -227,6 +275,8 @@ class TransportFlowControl final : public TransportFlowControlBase { |
|
|
|
remote_window_ += size; |
|
|
|
remote_window_ += size; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// See comment above announced_stream_total_over_incoming_window_ for the
|
|
|
|
|
|
|
|
// logic behind this decision.
|
|
|
|
int64_t target_window() const override { |
|
|
|
int64_t target_window() const override { |
|
|
|
return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), |
|
|
|
return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), |
|
|
|
announced_stream_total_over_incoming_window_ + |
|
|
|
announced_stream_total_over_incoming_window_ + |
|
|
@ -296,21 +346,46 @@ class TransportFlowControl final : public TransportFlowControlBase { |
|
|
|
grpc_millis last_pid_update_ = 0; |
|
|
|
grpc_millis last_pid_update_ = 0; |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Fat interface with all methods a stream flow control implementation needs
|
|
|
|
|
|
|
|
// to support. gRPC C Core does not support pure virtual functions, so instead
|
|
|
|
|
|
|
|
// we abort in any methods which require implementation in the base class.
|
|
|
|
class StreamFlowControlBase { |
|
|
|
class StreamFlowControlBase { |
|
|
|
public: |
|
|
|
public: |
|
|
|
StreamFlowControlBase() {} |
|
|
|
StreamFlowControlBase() {} |
|
|
|
virtual ~StreamFlowControlBase() {} |
|
|
|
virtual ~StreamFlowControlBase() {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Updates an action using the protected members.
|
|
|
|
virtual FlowControlAction UpdateAction(FlowControlAction action) { abort(); } |
|
|
|
virtual FlowControlAction UpdateAction(FlowControlAction action) { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Using the protected members, returns an Action for this stream to be
|
|
|
|
|
|
|
|
// taken by the tranport.
|
|
|
|
virtual FlowControlAction MakeAction() { abort(); } |
|
|
|
virtual FlowControlAction MakeAction() { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Bookkeeping for when data is sent on this stream.
|
|
|
|
virtual void SentData(int64_t outgoing_frame_size) { abort(); } |
|
|
|
virtual void SentData(int64_t outgoing_frame_size) { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Bookkeeping and error checking for when data is received by this stream.
|
|
|
|
virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } |
|
|
|
virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Called to check if this stream needs to send a WINDOW_UPDATE frame.
|
|
|
|
virtual uint32_t MaybeSendUpdate() { abort(); } |
|
|
|
virtual uint32_t MaybeSendUpdate() { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Bookkeeping for receiving a WINDOW_UPDATE from for this stream.
|
|
|
|
virtual void RecvUpdate(uint32_t size) { abort(); } |
|
|
|
virtual void RecvUpdate(uint32_t size) { abort(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Bookkeeping for when a call pulls bytes out of the transport. At this
|
|
|
|
|
|
|
|
// point we consider the data 'used' and can thus let out peer know we are
|
|
|
|
|
|
|
|
// ready for more data.
|
|
|
|
virtual void IncomingByteStreamUpdate(size_t max_size_hint, |
|
|
|
virtual void IncomingByteStreamUpdate(size_t max_size_hint, |
|
|
|
size_t have_already) { |
|
|
|
size_t have_already) { |
|
|
|
abort(); |
|
|
|
abort(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Used in certain benchmarks in which we don't want FlowControl to be a
|
|
|
|
|
|
|
|
// factor
|
|
|
|
virtual void TestOnlyForceHugeWindow() {} |
|
|
|
virtual void TestOnlyForceHugeWindow() {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Getters
|
|
|
|
int64_t remote_window_delta() { return remote_window_delta_; } |
|
|
|
int64_t remote_window_delta() { return remote_window_delta_; } |
|
|
|
int64_t local_window_delta() { return local_window_delta_; } |
|
|
|
int64_t local_window_delta() { return local_window_delta_; } |
|
|
|
int64_t announced_window_delta() { return announced_window_delta_; } |
|
|
|
int64_t announced_window_delta() { return announced_window_delta_; } |
|
|
@ -324,6 +399,10 @@ class StreamFlowControlBase { |
|
|
|
int64_t announced_window_delta_ = 0; |
|
|
|
int64_t announced_window_delta_ = 0; |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Implementation of flow control that does NOTHING. Always returns maximum
|
|
|
|
|
|
|
|
// values, never initiates writes, and assumes that the remote peer is doing
|
|
|
|
|
|
|
|
// the same. To be used to narrow down on flow control as the cause of negative
|
|
|
|
|
|
|
|
// performance.
|
|
|
|
class StreamFlowControlDisabled : public StreamFlowControlBase { |
|
|
|
class StreamFlowControlDisabled : public StreamFlowControlBase { |
|
|
|
public: |
|
|
|
public: |
|
|
|
FlowControlAction UpdateAction(FlowControlAction action) override { |
|
|
|
FlowControlAction UpdateAction(FlowControlAction action) override { |
|
|
@ -340,6 +419,8 @@ class StreamFlowControlDisabled : public StreamFlowControlBase { |
|
|
|
size_t have_already) override {} |
|
|
|
size_t have_already) override {} |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Implementation of flow control that abides to HTTP/2 spec and attempts
|
|
|
|
|
|
|
|
// to be as performant as possible.
|
|
|
|
class StreamFlowControl final : public StreamFlowControlBase { |
|
|
|
class StreamFlowControl final : public StreamFlowControlBase { |
|
|
|
public: |
|
|
|
public: |
|
|
|
StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s); |
|
|
|
StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s); |
|
|
|