[call-v3] Add state machinery for flow control (#37867)

So far missing for HTTP/2 style flow control has been a primitive to query whether there's a receiver for flow control data at the other end of the message pipes.

Here I'm updating the state machine accessors to accommodate that functionality.

No new states were needed.

Whilst here, document the current member functions on `CallState`.

Closes #37867

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37867 from ctiller:like-the-river c9814c737d
PiperOrigin-RevId: 684972125
pull/37903/head
Craig Tiller 4 months ago committed by Copybara-Service
parent 7fb7d3c3b6
commit 6bfaae4468
  1. 105
      src/core/lib/transport/call_state.h
  2. 33
      test/core/transport/call_state_test.cc

@ -29,29 +29,80 @@ namespace grpc_core {
class CallState {
public:
CallState();
/////////////////////////////////////////////////////////////////////////////
// Misc events
// Start the call: allows pulls to proceed
void Start();
/////////////////////////////////////////////////////////////////////////////
// PUSH: client -> server
// Poll for the next message pull to be started.
// This can be used for flow control by waiting for the reader to request
// data, then providing flow control tokens to read, and finally pushing the
// message.
Poll<StatusFlag> PollPullClientToServerMessageStarted();
// Begin a message push.
void BeginPushClientToServerMessage();
// Poll for the push to be completed (up to FinishPullClientToServerMessage).
Poll<StatusFlag> PollPushClientToServerMessage();
// Note that the client has half-closed the stream.
void ClientToServerHalfClose();
/////////////////////////////////////////////////////////////////////////////
// PULL: client -> server
// Begin pulling client initial metadata.
void BeginPullClientInitialMetadata();
// Finish pulling client initial metadata.
void FinishPullClientInitialMetadata();
// Poll for the next message pull to be available.
// Resolves to true if a message is available, false if the call is
// half-closed, and Failure if the call is cancelled.
Poll<ValueOrFailure<bool>> PollPullClientToServerMessageAvailable();
// Finish pulling a message.
void FinishPullClientToServerMessage();
/////////////////////////////////////////////////////////////////////////////
// PUSH: server -> client
// Push server initial metadata (instantaneous).
StatusFlag PushServerInitialMetadata();
// Poll for the next message pull to be started.
// This can be used for flow control by waiting for the reader to request
// data, then providing flow control tokens to read, and finally pushing the
// message.
Poll<StatusFlag> PollPullServerToClientMessageStarted();
// Begin a message push.
void BeginPushServerToClientMessage();
// Poll for the push to be completed (up to FinishPullServerToClientMessage).
Poll<StatusFlag> PollPushServerToClientMessage();
// Push server trailing metadata.
// This is idempotent: only the first call will have any effect.
// Returns true if this is the first call.
bool PushServerTrailingMetadata(bool cancel);
/////////////////////////////////////////////////////////////////////////////
// PULL: server -> client
// Poll for initial metadata to be available.
Poll<bool> PollPullServerInitialMetadataAvailable();
// Finish pulling server initial metadata.
void FinishPullServerInitialMetadata();
// Poll for the next message pull to be available.
// Resolves to true if a message is available, false if trailing metadata is
// ready, and Failure if the call is cancelled.
Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable();
// Finish pulling a message.
void FinishPullServerToClientMessage();
// Poll for trailing metadata to be available.
Poll<Empty> PollServerTrailingMetadataAvailable();
void FinishPullServerTrailingMetadata();
// Instantaneously return true if server trailing metadata has been pulled.
bool WasServerTrailingMetadataPulled() const;
// Resolves after server trailing metadata has been pulled, to true if the
// call was cancelled, and false otherwise.
@ -59,6 +110,8 @@ class CallState {
// Return true if server trailing metadata has been pushed *and* that push was
// a cancellation.
bool WasCancelledPushed() const;
/////////////////////////////////////////////////////////////////////////////
// Debug
std::string DebugString() const;
@ -419,6 +472,7 @@ CallState::PollPullClientToServerMessageAvailable() {
return client_to_server_pull_waiter_.pending();
case ClientToServerPullState::kIdle:
client_to_server_pull_state_ = ClientToServerPullState::kReading;
client_to_server_pull_waiter_.Wake();
ABSL_FALLTHROUGH_INTENDED;
case ClientToServerPullState::kReading:
break;
@ -447,6 +501,25 @@ CallState::PollPullClientToServerMessageAvailable() {
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
CallState::PollPullClientToServerMessageStarted() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PollPullClientToServerMessageStarted: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
case ClientToServerPullState::kProcessingClientInitialMetadata:
case ClientToServerPullState::kIdle:
return client_to_server_pull_waiter_.pending();
case ClientToServerPullState::kReading:
case ClientToServerPullState::kProcessingClientToServerMessage:
return Success{};
case ClientToServerPullState::kTerminated:
return Failure{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullClientToServerMessage() {
GRPC_TRACE_LOG(call_state, INFO)
@ -758,6 +831,7 @@ CallState::PollPullServerToClientMessageAvailable() {
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kIdle:
server_to_client_pull_state_ = ServerToClientPullState::kReading;
server_to_client_pull_waiter_.Wake();
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPullState::kReading:
break;
@ -797,6 +871,29 @@ CallState::PollPullServerToClientMessageAvailable() {
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
CallState::PollPullServerToClientMessageStarted() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PollPullClientToServerMessageStarted: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kIdle:
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
return Success{};
case ServerToClientPullState::kTerminated:
return Failure{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullServerToClientMessage() {
GRPC_TRACE_LOG(call_state, INFO)
@ -892,9 +989,6 @@ CallState::PollServerTrailingMetadataAvailable() {
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
LOG(FATAL) << "FinishPullServerTrailingMetadata called before "
"PollServerTrailingMetadataAvailable";
case ServerTrailingMetadataState::kPushed:
server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled;
server_trailing_metadata_waiter_.Wake();
@ -904,9 +998,10 @@ CallState::PollServerTrailingMetadataAvailable() {
ServerTrailingMetadataState::kPulledCancel;
server_trailing_metadata_waiter_.Wake();
break;
case ServerTrailingMetadataState::kNotPushed:
case ServerTrailingMetadataState::kPulled:
case ServerTrailingMetadataState::kPulledCancel:
LOG(FATAL) << "FinishPullServerTrailingMetadata called twice";
LOG(FATAL) << "PollServerTrailingMetadataAvailable completed twice";
}
return Empty{};
}

@ -300,6 +300,39 @@ TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) {
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
}
TEST(CallStateTest, CanWaitForPullClientMessage) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
EXPECT_THAT(state.PollPullClientToServerMessageStarted(), IsPending());
state.BeginPullClientInitialMetadata();
EXPECT_THAT(state.PollPullClientToServerMessageStarted(), IsPending());
// TODO(ctiller): consider adding another wakeup set to CallState to eliminate
// this wakeup (trade memory for cpu)
EXPECT_WAKEUP(activity, state.FinishPullClientInitialMetadata());
EXPECT_THAT(state.PollPullClientToServerMessageStarted(), IsPending());
EXPECT_WAKEUP(activity, state.PollPullClientToServerMessageAvailable());
EXPECT_THAT(state.PollPullClientToServerMessageStarted(), IsReady(Success{}));
}
TEST(CallStateTest, CanWaitForPullServerMessage) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
EXPECT_THAT(state.PollPullServerToClientMessageStarted(), IsPending());
state.PushServerInitialMetadata();
EXPECT_THAT(state.PollPullServerToClientMessageStarted(), IsPending());
EXPECT_WAKEUP(
activity,
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady()));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollPullServerToClientMessageStarted(), IsPending());
EXPECT_WAKEUP(activity, state.PollPullServerToClientMessageAvailable());
EXPECT_THAT(state.PollPullServerToClientMessageStarted(), IsReady(Success{}));
}
} // namespace grpc_core
int main(int argc, char** argv) {

Loading…
Cancel
Save