pull/37003/head
Craig Tiller 9 months ago
parent 23adb994cf
commit 94c1a142ca
  1. 1
      doc/trace_flags.md
  2. 21
      src/core/BUILD
  3. 2
      src/core/lib/debug/trace_flags.cc
  4. 1
      src/core/lib/debug/trace_flags.h
  5. 4
      src/core/lib/debug/trace_flags.yaml
  6. 5
      src/core/lib/promise/poll.h
  7. 1
      src/core/lib/promise/status_flag.h
  8. 695
      src/core/lib/transport/call_filters.cc
  9. 264
      src/core/lib/transport/call_filters.h
  10. 39
      src/core/lib/transport/call_state.cc
  11. 957
      src/core/lib/transport/call_state.h
  12. 15
      test/core/transport/BUILD
  13. 240
      test/core/transport/call_filters_test.cc
  14. 310
      test/core/transport/call_state_test.cc

1
doc/trace_flags.md generated

@ -90,6 +90,7 @@ accomplished by invoking `bazel build --config=dbg <target>`
- auth_context_refcount - Auth context refcounting.
- call_combiner - Call combiner state.
- call_refcount - Refcount on call.
- call_state - Traces transitions through the call spine state machine.
- closure - Legacy closure creation, scheduling, and completion.
- combiner - Combiner lock state.
- cq_refcount - Completion queue refcounting.

@ -504,6 +504,7 @@ grpc_cc_library(
external_deps = [
"absl/log:check",
"absl/strings:str_format",
"absl/types:optional",
],
language = "c++",
public_hdrs = [
@ -522,6 +523,7 @@ grpc_cc_library(
"absl/log:check",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
],
language = "c++",
@ -7418,6 +7420,24 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "call_state",
srcs = [
"lib/transport/call_state.cc",
],
hdrs = [
"lib/transport/call_state.h",
],
external_deps = ["absl/types:optional"],
deps = [
"activity",
"poll",
"status_flag",
"//:gpr",
"//:grpc_trace",
],
)
grpc_cc_library(
name = "call_filters",
srcs = [
@ -7429,6 +7449,7 @@ grpc_cc_library(
external_deps = ["absl/log:check"],
deps = [
"call_final_info",
"call_state",
"dump_args",
"if",
"latch",

@ -26,6 +26,7 @@ namespace grpc_core {
DebugOnlyTraceFlag auth_context_refcount_trace(false, "auth_context_refcount");
DebugOnlyTraceFlag call_combiner_trace(false, "call_combiner");
DebugOnlyTraceFlag call_refcount_trace(false, "call_refcount");
DebugOnlyTraceFlag call_state_trace(false, "call_state");
DebugOnlyTraceFlag closure_trace(false, "closure");
DebugOnlyTraceFlag combiner_trace(false, "combiner");
DebugOnlyTraceFlag cq_refcount_trace(false, "cq_refcount");
@ -229,6 +230,7 @@ const absl::flat_hash_map<std::string, TraceFlag*>& GetAllTraceFlags() {
{"auth_context_refcount", &auth_context_refcount_trace},
{"call_combiner", &call_combiner_trace},
{"call_refcount", &call_refcount_trace},
{"call_state", &call_state_trace},
{"closure", &closure_trace},
{"combiner", &combiner_trace},
{"cq_refcount", &cq_refcount_trace},

@ -26,6 +26,7 @@ namespace grpc_core {
extern DebugOnlyTraceFlag auth_context_refcount_trace;
extern DebugOnlyTraceFlag call_combiner_trace;
extern DebugOnlyTraceFlag call_refcount_trace;
extern DebugOnlyTraceFlag call_state_trace;
extern DebugOnlyTraceFlag closure_trace;
extern DebugOnlyTraceFlag combiner_trace;
extern DebugOnlyTraceFlag cq_refcount_trace;

@ -54,6 +54,10 @@ call_refcount:
debug_only: true
default: false
description: Refcount on call.
call_state:
debug_only: true
default: false
description: Traces transitions through the call spine state machine.
cares_address_sorting:
default: false
description: Operations of the c-ares based DNS resolver's address sorter.

@ -20,6 +20,7 @@
#include "absl/log/check.h"
#include "absl/strings/str_format.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
@ -303,7 +304,8 @@ void AbslStringify(Sink& sink, const Poll<absl::optional<T>>& poll) {
// Hack to get metadata printing
template <typename Sink, typename T, typename Deleter>
void AbslStringify(Sink& sink, const Poll<absl::optional<std::unique_ptr<T, Deleter>>>& poll) {
void AbslStringify(
Sink& sink, const Poll<absl::optional<std::unique_ptr<T, Deleter>>>& poll) {
if (poll.pending()) {
absl::Format(&sink, "<<pending>>");
return;
@ -316,7 +318,6 @@ void AbslStringify(Sink& sink, const Poll<absl::optional<std::unique_ptr<T, Dele
}
}
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_POLL_H

@ -20,6 +20,7 @@
#include "absl/log/check.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>

@ -289,699 +289,4 @@ RefCountedPtr<CallFilters::Stack> CallFilters::StackBuilder::Build() {
return RefCountedPtr<Stack>(new Stack(std::move(data_)));
}
///////////////////////////////////////////////////////////////////////////////
// CallState
namespace filters_detail {
CallState::CallState()
: client_to_server_pull_state_(ClientToServerPullState::kBegin),
client_to_server_push_state_(ClientToServerPushState::kIdle),
server_to_client_pull_state_(ServerToClientPullState::kUnstarted),
server_to_client_push_state_(ServerToClientPushState::kStart),
server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) {
}
void CallState::Start() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] Start: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
server_to_client_pull_state_ = ServerToClientPullState::kStarted;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kUnstartedReading:
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "Start called twice";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
case ServerToClientPullState::kTerminated:
break;
}
}
void CallState::BeginPushClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPushClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kPushedMessage;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
LOG(FATAL) << "PushClientToServerMessage called twice concurrently";
break;
case ClientToServerPushState::kPushedHalfClose:
LOG(FATAL) << "PushClientToServerMessage called after half-close";
break;
case ClientToServerPushState::kFinished:
break;
}
}
Poll<StatusFlag> CallState::PollPushClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPushClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
case ClientToServerPushState::kPushedHalfClose:
return Success{};
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
return client_to_server_push_waiter_.pending();
case ClientToServerPushState::kFinished:
return Failure{};
}
Crash("Unreachable");
}
void CallState::ClientToServerHalfClose() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] ClientToServerHalfClose: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
client_to_server_push_state_ =
ClientToServerPushState::kPushedMessageAndHalfClosed;
break;
case ClientToServerPushState::kPushedHalfClose:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
LOG(FATAL) << "ClientToServerHalfClose called twice";
break;
case ClientToServerPushState::kFinished:
break;
}
}
void CallState::BeginPullClientInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPullClientInitialMetadata: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
client_to_server_pull_state_ =
ClientToServerPullState::kProcessingClientInitialMetadata;
break;
case ClientToServerPullState::kProcessingClientInitialMetadata:
case ClientToServerPullState::kIdle:
case ClientToServerPullState::kReading:
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "BeginPullClientInitialMetadata called twice";
break;
case ClientToServerPullState::kTerminated:
break;
}
}
void CallState::FinishPullClientInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullClientInitialMetadata: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin";
break;
case ClientToServerPullState::kProcessingClientInitialMetadata:
client_to_server_pull_state_ = ClientToServerPullState::kIdle;
client_to_server_pull_waiter_.Wake();
break;
case ClientToServerPullState::kIdle:
case ClientToServerPullState::kReading:
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "Out of order FinishPullClientInitialMetadata";
break;
case ClientToServerPullState::kTerminated:
break;
}
}
Poll<ValueOrFailure<bool>> CallState::PollPullClientToServerMessageAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullClientToServerMessageAvailable: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
client_to_server_push_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
case ClientToServerPullState::kProcessingClientInitialMetadata:
return client_to_server_pull_waiter_.pending();
case ClientToServerPullState::kIdle:
client_to_server_pull_state_ = ClientToServerPullState::kReading;
ABSL_FALLTHROUGH_INTENDED;
case ClientToServerPullState::kReading:
break;
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "PollPullClientToServerMessageAvailable called while "
"processing a message";
break;
case ClientToServerPullState::kTerminated:
return Failure{};
}
DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
return client_to_server_push_waiter_.pending();
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_pull_state_ =
ClientToServerPullState::kProcessingClientToServerMessage;
return true;
case ClientToServerPushState::kPushedHalfClose:
return false;
case ClientToServerPushState::kFinished:
client_to_server_pull_state_ = ClientToServerPullState::kTerminated;
return Failure{};
}
Crash("Unreachable");
}
void CallState::FinishPullClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
client_to_server_push_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
case ClientToServerPullState::kProcessingClientInitialMetadata:
LOG(FATAL) << "FinishPullClientToServerMessage called before Begin";
break;
case ClientToServerPullState::kIdle:
LOG(FATAL) << "FinishPullClientToServerMessage called twice";
break;
case ClientToServerPullState::kReading:
LOG(FATAL) << "FinishPullClientToServerMessage called before "
"PollPullClientToServerMessageAvailable";
break;
case ClientToServerPullState::kProcessingClientToServerMessage:
client_to_server_pull_state_ = ClientToServerPullState::kIdle;
client_to_server_pull_waiter_.Wake();
break;
case ClientToServerPullState::kTerminated:
break;
}
switch (client_to_server_push_state_) {
case ClientToServerPushState::kPushedMessage:
client_to_server_push_state_ = ClientToServerPushState::kIdle;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kIdle:
case ClientToServerPushState::kPushedHalfClose:
LOG(FATAL) << "FinishPullClientToServerMessage called without a message";
break;
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kFinished:
break;
}
}
StatusFlag CallState::PushServerInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PushServerInitialMetadata: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_,
server_trailing_metadata_state_);
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return Failure{};
}
CHECK_EQ(server_to_client_push_state_, ServerToClientPushState::kStart);
server_to_client_push_state_ =
ServerToClientPushState::kPushedServerInitialMetadata;
server_to_client_push_waiter_.Wake();
return Success{};
}
void CallState::BeginPushServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPushServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
LOG(FATAL) << "BeginPushServerToClientMessage called before "
"PushServerInitialMetadata";
break;
case ServerToClientPushState::kPushedServerInitialMetadata:
server_to_client_push_state_ =
ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage;
break;
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently";
break;
case ServerToClientPushState::kTrailersOnly:
// Will fail in poll.
break;
case ServerToClientPushState::kIdle:
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kFinished:
break;
}
}
Poll<StatusFlag> CallState::PollPushServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPushServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
case ServerToClientPushState::kPushedServerInitialMetadata:
LOG(FATAL) << "PollPushServerToClientMessage called before "
<< "PushServerInitialMetadata";
case ServerToClientPushState::kTrailersOnly:
return false;
case ServerToClientPushState::kPushedMessage:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kIdle:
return Success{};
case ServerToClientPushState::kFinished:
return Failure{};
}
Crash("Unreachable");
}
bool CallState::PushServerTrailingMetadata(bool cancel) {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PushServerTrailingMetadata: "
<< GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_,
server_to_client_push_state_,
client_to_server_push_state_,
server_trailing_metadata_waiter_.DebugString());
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return false;
}
server_trailing_metadata_state_ =
cancel ? ServerTrailingMetadataState::kPushedCancel
: ServerTrailingMetadataState::kPushed;
server_trailing_metadata_waiter_.Wake();
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
if (cancel) {
server_to_client_push_state_ = ServerToClientPushState::kFinished;
server_to_client_push_waiter_.Wake();
}
break;
case ServerToClientPushState::kIdle:
if (cancel) {
server_to_client_push_state_ = ServerToClientPushState::kFinished;
server_to_client_push_waiter_.Wake();
}
break;
case ServerToClientPushState::kFinished:
case ServerToClientPushState::kTrailersOnly:
break;
}
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kFinished;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_push_state_ = ClientToServerPushState::kFinished;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedHalfClose:
case ClientToServerPushState::kFinished:
break;
}
return true;
}
Poll<bool> CallState::PollPullServerInitialMetadataAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullServerInitialMetadataAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_);
bool reading;
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
if (server_to_client_push_state_ ==
ServerToClientPushState::kTrailersOnly) {
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
return false;
}
server_to_client_push_waiter_.pending();
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStartedReading:
reading = true;
break;
case ServerToClientPullState::kStarted:
reading = false;
break;
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return false;
}
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kStarted ||
server_to_client_pull_state_ ==
ServerToClientPullState::kStartedReading)
<< server_to_client_pull_state_;
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
server_to_client_pull_state_ =
reading
? ServerToClientPullState::kProcessingServerInitialMetadataReading
: ServerToClientPullState::kProcessingServerInitialMetadata;
server_to_client_pull_waiter_.Wake();
return true;
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kPushedMessage:
LOG(FATAL)
<< "PollPullServerInitialMetadataAvailable after metadata processed";
case ServerToClientPushState::kFinished:
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
return false;
case ServerToClientPushState::kTrailersOnly:
return false;
}
Crash("Unreachable");
}
void CallState::FinishPullServerInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerInitialMetadata: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
LOG(FATAL) << "FinishPullServerInitialMetadata called before Start";
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
CHECK_EQ(server_to_client_push_state_,
ServerToClientPushState::kTrailersOnly);
return;
case ServerToClientPullState::kProcessingServerInitialMetadata:
server_to_client_pull_state_ = ServerToClientPullState::kIdle;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
server_to_client_pull_state_ = ServerToClientPullState::kReading;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "Out of order FinishPullServerInitialMetadata";
case ServerToClientPullState::kTerminated:
return;
}
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kIdle ||
server_to_client_pull_state_ == ServerToClientPullState::kReading)
<< server_to_client_pull_state_;
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
LOG(FATAL) << "FinishPullServerInitialMetadata called before initial "
"metadata consumed";
case ServerToClientPushState::kPushedServerInitialMetadata:
server_to_client_push_state_ = ServerToClientPushState::kIdle;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kPushedMessage:
case ServerToClientPushState::kTrailersOnly:
case ServerToClientPushState::kFinished:
LOG(FATAL) << "FinishPullServerInitialMetadata called twice";
}
}
Poll<ValueOrFailure<bool>> CallState::PollPullServerToClientMessageAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullServerToClientMessageAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_,
server_trailing_metadata_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
server_to_client_pull_state_ = ServerToClientPullState::kUnstartedReading;
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kProcessingServerInitialMetadata:
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerInitialMetadataReading;
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kUnstartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStarted:
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPullState::kStartedReading:
if (server_to_client_push_state_ ==
ServerToClientPushState::kTrailersOnly) {
return false;
}
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kIdle:
server_to_client_pull_state_ = ServerToClientPullState::kReading;
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPullState::kReading:
break;
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing a message";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing trailing metadata";
case ServerToClientPullState::kTerminated:
return Failure{};
}
DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kIdle:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return false;
}
server_trailing_metadata_waiter_.pending();
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kTrailersOnly:
DCHECK_NE(server_trailing_metadata_state_,
ServerTrailingMetadataState::kNotPushed);
return false;
case ServerToClientPushState::kPushedMessage:
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerToClientMessage;
server_to_client_pull_waiter_.Wake();
return true;
case ServerToClientPushState::kFinished:
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
return Failure{};
}
Crash("Unreachable");
}
void CallState::FinishPullServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
LOG(FATAL)
<< "FinishPullServerToClientMessage called before metadata available";
case ServerToClientPullState::kIdle:
LOG(FATAL) << "FinishPullServerToClientMessage called twice";
case ServerToClientPullState::kReading:
LOG(FATAL) << "FinishPullServerToClientMessage called before "
<< "PollPullServerToClientMessageAvailable";
case ServerToClientPullState::kProcessingServerToClientMessage:
server_to_client_pull_state_ = ServerToClientPullState::kIdle;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "FinishPullServerToClientMessage called while processing "
"trailing metadata";
case ServerToClientPullState::kTerminated:
break;
}
switch (server_to_client_push_state_) {
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kStart:
LOG(FATAL) << "FinishPullServerToClientMessage called before initial "
"metadata consumed";
case ServerToClientPushState::kTrailersOnly:
LOG(FATAL) << "FinishPullServerToClientMessage called after "
"PushServerTrailingMetadata";
case ServerToClientPushState::kPushedMessage:
server_to_client_push_state_ = ServerToClientPushState::kIdle;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kIdle:
LOG(FATAL) << "FinishPullServerToClientMessage called without a message";
case ServerToClientPushState::kFinished:
break;
}
}
Poll<Empty> CallState::PollServerTrailingMetadataAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollServerTrailingMetadataAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_,
server_trailing_metadata_state_,
server_trailing_metadata_waiter_.DebugString());
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kUnstartedReading:
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kReading:
switch (server_to_client_push_state_) {
case ServerToClientPushState::kTrailersOnly:
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kStart:
case ServerToClientPushState::kFinished:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerTrailingMetadata;
server_to_client_pull_waiter_.Wake();
return Empty{};
}
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::
kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
server_to_client_push_waiter_.pending();
return server_to_client_pull_waiter_.pending();
}
break;
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kIdle:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerTrailingMetadata;
server_to_client_pull_waiter_.Wake();
return Empty{};
}
return server_trailing_metadata_waiter_.pending();
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollServerTrailingMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return Empty{};
}
Crash("Unreachable");
}
void CallState::FinishPullServerTrailingMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerTrailingMetadata: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_,
server_trailing_metadata_waiter_.DebugString());
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
LOG(FATAL) << "FinishPullServerTrailingMetadata called before "
"PollServerTrailingMetadataAvailable";
case ServerTrailingMetadataState::kPushed:
server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled;
server_trailing_metadata_waiter_.Wake();
break;
case ServerTrailingMetadataState::kPushedCancel:
server_trailing_metadata_state_ =
ServerTrailingMetadataState::kPulledCancel;
server_trailing_metadata_waiter_.Wake();
break;
case ServerTrailingMetadataState::kPulled:
case ServerTrailingMetadataState::kPulledCancel:
LOG(FATAL) << "FinishPullServerTrailingMetadata called twice";
}
}
Poll<bool> CallState::PollWasCancelled() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollWasCancelled: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
case ServerTrailingMetadataState::kPushed:
case ServerTrailingMetadataState::kPushedCancel: {
return server_trailing_metadata_waiter_.pending();
}
case ServerTrailingMetadataState::kPulled:
return false;
case ServerTrailingMetadataState::kPulledCancel:
return true;
}
Crash("Unreachable");
}
std::string CallState::DebugString() const {
return absl::StrCat(
"client_to_server_pull_state:", client_to_server_pull_state_,
" client_to_server_push_state:", client_to_server_push_state_,
" server_to_client_pull_state:", server_to_client_pull_state_,
" server_to_client_message_push_state:", server_to_client_push_state_,
" server_trailing_metadata_state:", server_trailing_metadata_state_,
client_to_server_push_waiter_.DebugString(),
" server_to_client_push_waiter:",
server_to_client_push_waiter_.DebugString(),
" client_to_server_pull_waiter:",
client_to_server_pull_waiter_.DebugString(),
" server_to_client_pull_waiter:",
server_to_client_pull_waiter_.DebugString(),
" server_trailing_metadata_waiter:",
server_trailing_metadata_waiter_.DebugString());
}
static_assert(sizeof(CallState) <= 16, "CallState too large");
} // namespace filters_detail
} // namespace grpc_core

@ -35,6 +35,7 @@
#include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/transport/call_final_info.h"
#include "src/core/lib/transport/call_state.h"
#include "src/core/lib/transport/message.h"
#include "src/core/lib/transport/metadata.h"
@ -1191,244 +1192,6 @@ class InfallibleOperationExecutor {
const InfallibleOperator<T>* end_ops_;
};
class CallState {
public:
CallState();
// Start the call: allows pulls to proceed
void Start();
// PUSH: client -> server
void BeginPushClientToServerMessage();
Poll<StatusFlag> PollPushClientToServerMessage();
void ClientToServerHalfClose();
// PULL: client -> server
void BeginPullClientInitialMetadata();
void FinishPullClientInitialMetadata();
Poll<ValueOrFailure<bool>> PollPullClientToServerMessageAvailable();
void FinishPullClientToServerMessage();
// PUSH: server -> client
StatusFlag PushServerInitialMetadata();
void BeginPushServerToClientMessage();
Poll<StatusFlag> PollPushServerToClientMessage();
bool PushServerTrailingMetadata(bool cancel);
// PULL: server -> client
Poll<bool> PollPullServerInitialMetadataAvailable();
void FinishPullServerInitialMetadata();
Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable();
void FinishPullServerToClientMessage();
Poll<Empty> PollServerTrailingMetadataAvailable();
void FinishPullServerTrailingMetadata();
Poll<bool> PollWasCancelled();
// Debug
std::string DebugString() const;
friend std::ostream& operator<<(std::ostream& out,
const CallState& call_state) {
return out << call_state.DebugString();
}
private:
enum class ClientToServerPullState : uint16_t {
// Ready to read: client initial metadata is there, but not yet processed
kBegin,
// Processing client initial metadata
kProcessingClientInitialMetadata,
// Main call loop: not reading
kIdle,
// Main call loop: reading but no message available
kReading,
// Main call loop: processing one message
kProcessingClientToServerMessage,
// Processing complete
kTerminated,
};
static const char* ClientToServerPullStateString(
ClientToServerPullState state) {
switch (state) {
case ClientToServerPullState::kBegin:
return "Begin";
case ClientToServerPullState::kProcessingClientInitialMetadata:
return "ProcessingClientInitialMetadata";
case ClientToServerPullState::kIdle:
return "Idle";
case ClientToServerPullState::kReading:
return "Reading";
case ClientToServerPullState::kProcessingClientToServerMessage:
return "ProcessingClientToServerMessage";
case ClientToServerPullState::kTerminated:
return "Terminated";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ClientToServerPullState state) {
out.Append(ClientToServerPullStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ClientToServerPullState state) {
return out << ClientToServerPullStateString(state);
}
enum class ClientToServerPushState : uint16_t {
kIdle,
kPushedMessage,
kPushedHalfClose,
kPushedMessageAndHalfClosed,
kFinished,
};
static const char* ClientToServerPushStateString(
ClientToServerPushState state) {
switch (state) {
case ClientToServerPushState::kIdle:
return "Idle";
case ClientToServerPushState::kPushedMessage:
return "PushedMessage";
case ClientToServerPushState::kPushedHalfClose:
return "PushedHalfClose";
case ClientToServerPushState::kPushedMessageAndHalfClosed:
return "PushedMessageAndHalfClosed";
case ClientToServerPushState::kFinished:
return "Finished";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ClientToServerPushState state) {
out.Append(ClientToServerPushStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ClientToServerPushState state) {
return out << ClientToServerPushStateString(state);
}
enum class ServerToClientPullState : uint16_t {
// Not yet started: cannot read
kUnstarted,
kUnstartedReading,
kStarted,
kStartedReading,
// Processing server initial metadata
kProcessingServerInitialMetadata,
kProcessingServerInitialMetadataReading,
// Main call loop: not reading
kIdle,
// Main call loop: reading but no message available
kReading,
// Main call loop: processing one message
kProcessingServerToClientMessage,
// Processing server trailing metadata
kProcessingServerTrailingMetadata,
kTerminated,
};
static const char* ServerToClientPullStateString(
ServerToClientPullState state) {
switch (state) {
case ServerToClientPullState::kUnstarted:
return "Unstarted";
case ServerToClientPullState::kUnstartedReading:
return "UnstartedReading";
case ServerToClientPullState::kStarted:
return "Started";
case ServerToClientPullState::kStartedReading:
return "StartedReading";
case ServerToClientPullState::kProcessingServerInitialMetadata:
return "ProcessingServerInitialMetadata";
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
return "ProcessingServerInitialMetadataReading";
case ServerToClientPullState::kIdle:
return "Idle";
case ServerToClientPullState::kReading:
return "Reading";
case ServerToClientPullState::kProcessingServerToClientMessage:
return "ProcessingServerToClientMessage";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
return "ProcessingServerTrailingMetadata";
case ServerToClientPullState::kTerminated:
return "Terminated";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerToClientPullState state) {
out.Append(ServerToClientPullStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerToClientPullState state) {
return out << ServerToClientPullStateString(state);
}
enum class ServerToClientPushState : uint16_t {
kStart,
kPushedServerInitialMetadata,
kPushedServerInitialMetadataAndPushedMessage,
kTrailersOnly,
kIdle,
kPushedMessage,
kFinished,
};
static const char* ServerToClientPushStateString(
ServerToClientPushState state) {
switch (state) {
case ServerToClientPushState::kStart:
return "Start";
case ServerToClientPushState::kPushedServerInitialMetadata:
return "PushedServerInitialMetadata";
case ServerToClientPushState::
kPushedServerInitialMetadataAndPushedMessage:
return "PushedServerInitialMetadataAndPushedMessage";
case ServerToClientPushState::kTrailersOnly:
return "TrailersOnly";
case ServerToClientPushState::kIdle:
return "Idle";
case ServerToClientPushState::kPushedMessage:
return "PushedMessage";
case ServerToClientPushState::kFinished:
return "Finished";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerToClientPushState state) {
out.Append(ServerToClientPushStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerToClientPushState state) {
return out << ServerToClientPushStateString(state);
}
enum class ServerTrailingMetadataState : uint16_t {
kNotPushed,
kPushed,
kPushedCancel,
kPulled,
kPulledCancel,
};
static const char* ServerTrailingMetadataStateString(
ServerTrailingMetadataState state) {
switch (state) {
case ServerTrailingMetadataState::kNotPushed:
return "NotPushed";
case ServerTrailingMetadataState::kPushed:
return "Pushed";
case ServerTrailingMetadataState::kPushedCancel:
return "PushedCancel";
case ServerTrailingMetadataState::kPulled:
return "Pulled";
case ServerTrailingMetadataState::kPulledCancel:
return "PulledCancel";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerTrailingMetadataState state) {
out.Append(ServerTrailingMetadataStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerTrailingMetadataState state) {
return out << ServerTrailingMetadataStateString(state);
}
ClientToServerPullState client_to_server_pull_state_ : 3;
ClientToServerPushState client_to_server_push_state_ : 3;
ServerToClientPullState server_to_client_pull_state_ : 4;
ServerToClientPushState server_to_client_push_state_ : 3;
ServerTrailingMetadataState server_trailing_metadata_state_ : 3;
IntraActivityWaiter client_to_server_pull_waiter_;
IntraActivityWaiter server_to_client_pull_waiter_;
IntraActivityWaiter client_to_server_push_waiter_;
IntraActivityWaiter server_to_client_push_waiter_;
IntraActivityWaiter server_trailing_metadata_waiter_;
};
template <typename Fn>
class ServerTrailingMetadataInterceptor {
public:
@ -1605,8 +1368,7 @@ class CallFilters {
}
private:
template <typename Output, void (filters_detail::CallState::*on_done)(),
typename Input>
template <typename Output, void (CallState::*on_done)(), typename Input>
Poll<ValueOrFailure<Output>> FinishStep(
Poll<filters_detail::ResultOr<Input>> p) {
auto* r = p.value_if_ready();
@ -1623,7 +1385,7 @@ class CallFilters {
Input(CallFilters::*input_location),
filters_detail::Layout<filters_detail::FallibleOperator<Input>>(
filters_detail::StackData::*layout),
void (filters_detail::CallState::*on_done)()>
void (CallState::*on_done)()>
auto RunExecutor() {
DCHECK_NE((this->*input_location).get(), nullptr);
filters_detail::OperationExecutor<Input> executor;
@ -1642,11 +1404,10 @@ class CallFilters {
// Returns a promise that resolves to ValueOrFailure<ClientMetadataHandle>
GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() {
call_state_.BeginPullClientInitialMetadata();
return RunExecutor<
ClientMetadataHandle, ClientMetadataHandle,
&CallFilters::push_client_initial_metadata_,
&filters_detail::StackData::client_initial_metadata,
&filters_detail::CallState::FinishPullClientInitialMetadata>();
return RunExecutor<ClientMetadataHandle, ClientMetadataHandle,
&CallFilters::push_client_initial_metadata_,
&filters_detail::StackData::client_initial_metadata,
&CallState::FinishPullClientInitialMetadata>();
}
// Server: Push server initial metadata
// Returns a promise that resolves to a StatusFlag indicating success
@ -1671,8 +1432,7 @@ class CallFilters {
ServerMetadataHandle,
&CallFilters::push_server_initial_metadata_,
&filters_detail::StackData::server_initial_metadata,
&filters_detail::CallState::
FinishPullServerInitialMetadata>(),
&CallState::FinishPullServerInitialMetadata>(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> r) {
if (r.ok()) return std::move(*r);
return absl::optional<ServerMetadataHandle>{};
@ -1709,8 +1469,7 @@ class CallFilters {
absl::optional<MessageHandle>, MessageHandle,
&CallFilters::push_client_to_server_message_,
&filters_detail::StackData::client_to_server_messages,
&filters_detail::CallState::
FinishPullClientToServerMessage>();
&CallState::FinishPullClientToServerMessage>();
},
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return absl::optional<MessageHandle>();
@ -1739,8 +1498,7 @@ class CallFilters {
absl::optional<MessageHandle>, MessageHandle,
&CallFilters::push_server_to_client_message_,
&filters_detail::StackData::server_to_client_messages,
&filters_detail::CallState::
FinishPullServerToClientMessage>();
&CallState::FinishPullServerToClientMessage>();
},
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return absl::optional<MessageHandle>();
@ -1802,7 +1560,7 @@ class CallFilters {
RefCountedPtr<Stack> stack_;
filters_detail::CallState call_state_;
CallState call_state_;
void* call_data_;
ClientMetadataHandle push_client_initial_metadata_;

@ -0,0 +1,39 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/transport/call_state.h"
namespace grpc_core {
std::string CallState::DebugString() const {
return absl::StrCat(
"client_to_server_pull_state:", client_to_server_pull_state_,
" client_to_server_push_state:", client_to_server_push_state_,
" server_to_client_pull_state:", server_to_client_pull_state_,
" server_to_client_message_push_state:", server_to_client_push_state_,
" server_trailing_metadata_state:", server_trailing_metadata_state_,
client_to_server_push_waiter_.DebugString(),
" server_to_client_push_waiter:",
server_to_client_push_waiter_.DebugString(),
" client_to_server_pull_waiter:",
client_to_server_pull_waiter_.DebugString(),
" server_to_client_pull_waiter:",
server_to_client_pull_waiter_.DebugString(),
" server_trailing_metadata_waiter:",
server_trailing_metadata_waiter_.DebugString());
}
static_assert(sizeof(CallState) <= 16, "CallState too large");
} // namespace grpc_core

@ -0,0 +1,957 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
#include "absl/types/optional.h"
#include <grpc/support/port_platform.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/status_flag.h"
namespace grpc_core {
class CallState {
public:
CallState();
// Start the call: allows pulls to proceed
void Start();
// PUSH: client -> server
void BeginPushClientToServerMessage();
Poll<StatusFlag> PollPushClientToServerMessage();
void ClientToServerHalfClose();
// PULL: client -> server
void BeginPullClientInitialMetadata();
void FinishPullClientInitialMetadata();
Poll<ValueOrFailure<bool>> PollPullClientToServerMessageAvailable();
void FinishPullClientToServerMessage();
// PUSH: server -> client
StatusFlag PushServerInitialMetadata();
void BeginPushServerToClientMessage();
Poll<StatusFlag> PollPushServerToClientMessage();
bool PushServerTrailingMetadata(bool cancel);
// PULL: server -> client
Poll<bool> PollPullServerInitialMetadataAvailable();
void FinishPullServerInitialMetadata();
Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable();
void FinishPullServerToClientMessage();
Poll<Empty> PollServerTrailingMetadataAvailable();
void FinishPullServerTrailingMetadata();
Poll<bool> PollWasCancelled();
// Debug
std::string DebugString() const;
friend std::ostream& operator<<(std::ostream& out,
const CallState& call_state) {
return out << call_state.DebugString();
}
private:
enum class ClientToServerPullState : uint16_t {
// Ready to read: client initial metadata is there, but not yet processed
kBegin,
// Processing client initial metadata
kProcessingClientInitialMetadata,
// Main call loop: not reading
kIdle,
// Main call loop: reading but no message available
kReading,
// Main call loop: processing one message
kProcessingClientToServerMessage,
// Processing complete
kTerminated,
};
static const char* ClientToServerPullStateString(
ClientToServerPullState state) {
switch (state) {
case ClientToServerPullState::kBegin:
return "Begin";
case ClientToServerPullState::kProcessingClientInitialMetadata:
return "ProcessingClientInitialMetadata";
case ClientToServerPullState::kIdle:
return "Idle";
case ClientToServerPullState::kReading:
return "Reading";
case ClientToServerPullState::kProcessingClientToServerMessage:
return "ProcessingClientToServerMessage";
case ClientToServerPullState::kTerminated:
return "Terminated";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ClientToServerPullState state) {
out.Append(ClientToServerPullStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ClientToServerPullState state) {
return out << ClientToServerPullStateString(state);
}
enum class ClientToServerPushState : uint16_t {
kIdle,
kPushedMessage,
kPushedHalfClose,
kPushedMessageAndHalfClosed,
kFinished,
};
static const char* ClientToServerPushStateString(
ClientToServerPushState state) {
switch (state) {
case ClientToServerPushState::kIdle:
return "Idle";
case ClientToServerPushState::kPushedMessage:
return "PushedMessage";
case ClientToServerPushState::kPushedHalfClose:
return "PushedHalfClose";
case ClientToServerPushState::kPushedMessageAndHalfClosed:
return "PushedMessageAndHalfClosed";
case ClientToServerPushState::kFinished:
return "Finished";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ClientToServerPushState state) {
out.Append(ClientToServerPushStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ClientToServerPushState state) {
return out << ClientToServerPushStateString(state);
}
enum class ServerToClientPullState : uint16_t {
// Not yet started: cannot read
kUnstarted,
kUnstartedReading,
kStarted,
kStartedReading,
// Processing server initial metadata
kProcessingServerInitialMetadata,
kProcessingServerInitialMetadataReading,
// Main call loop: not reading
kIdle,
// Main call loop: reading but no message available
kReading,
// Main call loop: processing one message
kProcessingServerToClientMessage,
// Processing server trailing metadata
kProcessingServerTrailingMetadata,
kTerminated,
};
static const char* ServerToClientPullStateString(
ServerToClientPullState state) {
switch (state) {
case ServerToClientPullState::kUnstarted:
return "Unstarted";
case ServerToClientPullState::kUnstartedReading:
return "UnstartedReading";
case ServerToClientPullState::kStarted:
return "Started";
case ServerToClientPullState::kStartedReading:
return "StartedReading";
case ServerToClientPullState::kProcessingServerInitialMetadata:
return "ProcessingServerInitialMetadata";
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
return "ProcessingServerInitialMetadataReading";
case ServerToClientPullState::kIdle:
return "Idle";
case ServerToClientPullState::kReading:
return "Reading";
case ServerToClientPullState::kProcessingServerToClientMessage:
return "ProcessingServerToClientMessage";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
return "ProcessingServerTrailingMetadata";
case ServerToClientPullState::kTerminated:
return "Terminated";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerToClientPullState state) {
out.Append(ServerToClientPullStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerToClientPullState state) {
return out << ServerToClientPullStateString(state);
}
enum class ServerToClientPushState : uint16_t {
kStart,
kPushedServerInitialMetadata,
kPushedServerInitialMetadataAndPushedMessage,
kTrailersOnly,
kIdle,
kPushedMessage,
kFinished,
};
static const char* ServerToClientPushStateString(
ServerToClientPushState state) {
switch (state) {
case ServerToClientPushState::kStart:
return "Start";
case ServerToClientPushState::kPushedServerInitialMetadata:
return "PushedServerInitialMetadata";
case ServerToClientPushState::
kPushedServerInitialMetadataAndPushedMessage:
return "PushedServerInitialMetadataAndPushedMessage";
case ServerToClientPushState::kTrailersOnly:
return "TrailersOnly";
case ServerToClientPushState::kIdle:
return "Idle";
case ServerToClientPushState::kPushedMessage:
return "PushedMessage";
case ServerToClientPushState::kFinished:
return "Finished";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerToClientPushState state) {
out.Append(ServerToClientPushStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerToClientPushState state) {
return out << ServerToClientPushStateString(state);
}
enum class ServerTrailingMetadataState : uint16_t {
kNotPushed,
kPushed,
kPushedCancel,
kPulled,
kPulledCancel,
};
static const char* ServerTrailingMetadataStateString(
ServerTrailingMetadataState state) {
switch (state) {
case ServerTrailingMetadataState::kNotPushed:
return "NotPushed";
case ServerTrailingMetadataState::kPushed:
return "Pushed";
case ServerTrailingMetadataState::kPushedCancel:
return "PushedCancel";
case ServerTrailingMetadataState::kPulled:
return "Pulled";
case ServerTrailingMetadataState::kPulledCancel:
return "PulledCancel";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerTrailingMetadataState state) {
out.Append(ServerTrailingMetadataStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerTrailingMetadataState state) {
return out << ServerTrailingMetadataStateString(state);
}
ClientToServerPullState client_to_server_pull_state_ : 3;
ClientToServerPushState client_to_server_push_state_ : 3;
ServerToClientPullState server_to_client_pull_state_ : 4;
ServerToClientPushState server_to_client_push_state_ : 3;
ServerTrailingMetadataState server_trailing_metadata_state_ : 3;
IntraActivityWaiter client_to_server_pull_waiter_;
IntraActivityWaiter server_to_client_pull_waiter_;
IntraActivityWaiter client_to_server_push_waiter_;
IntraActivityWaiter server_to_client_push_waiter_;
IntraActivityWaiter server_trailing_metadata_waiter_;
};
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline CallState::CallState()
: client_to_server_pull_state_(ClientToServerPullState::kBegin),
client_to_server_push_state_(ClientToServerPushState::kIdle),
server_to_client_pull_state_(ServerToClientPullState::kUnstarted),
server_to_client_push_state_(ServerToClientPushState::kStart),
server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) {
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void CallState::Start() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] Start: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
server_to_client_pull_state_ = ServerToClientPullState::kStarted;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kUnstartedReading:
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "Start called twice";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
case ServerToClientPullState::kTerminated:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::BeginPushClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPushClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kPushedMessage;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
LOG(FATAL) << "PushClientToServerMessage called twice concurrently";
break;
case ClientToServerPushState::kPushedHalfClose:
LOG(FATAL) << "PushClientToServerMessage called after half-close";
break;
case ClientToServerPushState::kFinished:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
CallState::PollPushClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPushClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
case ClientToServerPushState::kPushedHalfClose:
return Success{};
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
return client_to_server_push_waiter_.pending();
case ClientToServerPushState::kFinished:
return Failure{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::ClientToServerHalfClose() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] ClientToServerHalfClose: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
client_to_server_push_state_ =
ClientToServerPushState::kPushedMessageAndHalfClosed;
break;
case ClientToServerPushState::kPushedHalfClose:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
LOG(FATAL) << "ClientToServerHalfClose called twice";
break;
case ClientToServerPushState::kFinished:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::BeginPullClientInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPullClientInitialMetadata: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
client_to_server_pull_state_ =
ClientToServerPullState::kProcessingClientInitialMetadata;
break;
case ClientToServerPullState::kProcessingClientInitialMetadata:
case ClientToServerPullState::kIdle:
case ClientToServerPullState::kReading:
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "BeginPullClientInitialMetadata called twice";
break;
case ClientToServerPullState::kTerminated:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullClientInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullClientInitialMetadata: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin";
break;
case ClientToServerPullState::kProcessingClientInitialMetadata:
client_to_server_pull_state_ = ClientToServerPullState::kIdle;
client_to_server_pull_waiter_.Wake();
break;
case ClientToServerPullState::kIdle:
case ClientToServerPullState::kReading:
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "Out of order FinishPullClientInitialMetadata";
break;
case ClientToServerPullState::kTerminated:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>>
CallState::PollPullClientToServerMessageAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullClientToServerMessageAvailable: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
client_to_server_push_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
case ClientToServerPullState::kProcessingClientInitialMetadata:
return client_to_server_pull_waiter_.pending();
case ClientToServerPullState::kIdle:
client_to_server_pull_state_ = ClientToServerPullState::kReading;
ABSL_FALLTHROUGH_INTENDED;
case ClientToServerPullState::kReading:
break;
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "PollPullClientToServerMessageAvailable called while "
"processing a message";
break;
case ClientToServerPullState::kTerminated:
return Failure{};
}
DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
return client_to_server_push_waiter_.pending();
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_pull_state_ =
ClientToServerPullState::kProcessingClientToServerMessage;
return true;
case ClientToServerPushState::kPushedHalfClose:
return false;
case ClientToServerPushState::kFinished:
client_to_server_pull_state_ = ClientToServerPullState::kTerminated;
return Failure{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
client_to_server_push_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
case ClientToServerPullState::kProcessingClientInitialMetadata:
LOG(FATAL) << "FinishPullClientToServerMessage called before Begin";
break;
case ClientToServerPullState::kIdle:
LOG(FATAL) << "FinishPullClientToServerMessage called twice";
break;
case ClientToServerPullState::kReading:
LOG(FATAL) << "FinishPullClientToServerMessage called before "
"PollPullClientToServerMessageAvailable";
break;
case ClientToServerPullState::kProcessingClientToServerMessage:
client_to_server_pull_state_ = ClientToServerPullState::kIdle;
client_to_server_pull_waiter_.Wake();
break;
case ClientToServerPullState::kTerminated:
break;
}
switch (client_to_server_push_state_) {
case ClientToServerPushState::kPushedMessage:
client_to_server_push_state_ = ClientToServerPushState::kIdle;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kIdle:
case ClientToServerPushState::kPushedHalfClose:
LOG(FATAL) << "FinishPullClientToServerMessage called without a message";
break;
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kFinished:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline StatusFlag
CallState::PushServerInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PushServerInitialMetadata: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_,
server_trailing_metadata_state_);
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return Failure{};
}
CHECK_EQ(server_to_client_push_state_, ServerToClientPushState::kStart);
server_to_client_push_state_ =
ServerToClientPushState::kPushedServerInitialMetadata;
server_to_client_push_waiter_.Wake();
return Success{};
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::BeginPushServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPushServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
LOG(FATAL) << "BeginPushServerToClientMessage called before "
"PushServerInitialMetadata";
break;
case ServerToClientPushState::kPushedServerInitialMetadata:
server_to_client_push_state_ =
ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage;
break;
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently";
break;
case ServerToClientPushState::kTrailersOnly:
// Will fail in poll.
break;
case ServerToClientPushState::kIdle:
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kFinished:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
CallState::PollPushServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPushServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
case ServerToClientPushState::kPushedServerInitialMetadata:
LOG(FATAL) << "PollPushServerToClientMessage called before "
<< "PushServerInitialMetadata";
case ServerToClientPushState::kTrailersOnly:
return false;
case ServerToClientPushState::kPushedMessage:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kIdle:
return Success{};
case ServerToClientPushState::kFinished:
return Failure{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool
CallState::PushServerTrailingMetadata(bool cancel) {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PushServerTrailingMetadata: "
<< GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_,
server_to_client_push_state_,
client_to_server_push_state_,
server_trailing_metadata_waiter_.DebugString());
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return false;
}
server_trailing_metadata_state_ =
cancel ? ServerTrailingMetadataState::kPushedCancel
: ServerTrailingMetadataState::kPushed;
server_trailing_metadata_waiter_.Wake();
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
if (cancel) {
server_to_client_push_state_ = ServerToClientPushState::kFinished;
server_to_client_push_waiter_.Wake();
}
break;
case ServerToClientPushState::kIdle:
if (cancel) {
server_to_client_push_state_ = ServerToClientPushState::kFinished;
server_to_client_push_waiter_.Wake();
}
break;
case ServerToClientPushState::kFinished:
case ServerToClientPushState::kTrailersOnly:
break;
}
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kFinished;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_push_state_ = ClientToServerPushState::kFinished;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedHalfClose:
case ClientToServerPushState::kFinished:
break;
}
return true;
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool>
CallState::PollPullServerInitialMetadataAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullServerInitialMetadataAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_);
bool reading;
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
if (server_to_client_push_state_ ==
ServerToClientPushState::kTrailersOnly) {
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
return false;
}
server_to_client_push_waiter_.pending();
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStartedReading:
reading = true;
break;
case ServerToClientPullState::kStarted:
reading = false;
break;
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return false;
}
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kStarted ||
server_to_client_pull_state_ ==
ServerToClientPullState::kStartedReading)
<< server_to_client_pull_state_;
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
server_to_client_pull_state_ =
reading
? ServerToClientPullState::kProcessingServerInitialMetadataReading
: ServerToClientPullState::kProcessingServerInitialMetadata;
server_to_client_pull_waiter_.Wake();
return true;
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kPushedMessage:
LOG(FATAL)
<< "PollPullServerInitialMetadataAvailable after metadata processed";
case ServerToClientPushState::kFinished:
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
return false;
case ServerToClientPushState::kTrailersOnly:
return false;
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullServerInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerInitialMetadata: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
LOG(FATAL) << "FinishPullServerInitialMetadata called before Start";
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
CHECK_EQ(server_to_client_push_state_,
ServerToClientPushState::kTrailersOnly);
return;
case ServerToClientPullState::kProcessingServerInitialMetadata:
server_to_client_pull_state_ = ServerToClientPullState::kIdle;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
server_to_client_pull_state_ = ServerToClientPullState::kReading;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "Out of order FinishPullServerInitialMetadata";
case ServerToClientPullState::kTerminated:
return;
}
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kIdle ||
server_to_client_pull_state_ == ServerToClientPullState::kReading)
<< server_to_client_pull_state_;
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
LOG(FATAL) << "FinishPullServerInitialMetadata called before initial "
"metadata consumed";
case ServerToClientPushState::kPushedServerInitialMetadata:
server_to_client_push_state_ = ServerToClientPushState::kIdle;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kPushedMessage:
case ServerToClientPushState::kTrailersOnly:
case ServerToClientPushState::kFinished:
LOG(FATAL) << "FinishPullServerInitialMetadata called twice";
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>>
CallState::PollPullServerToClientMessageAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullServerToClientMessageAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_,
server_trailing_metadata_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
server_to_client_pull_state_ = ServerToClientPullState::kUnstartedReading;
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kProcessingServerInitialMetadata:
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerInitialMetadataReading;
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kUnstartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStarted:
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPullState::kStartedReading:
if (server_to_client_push_state_ ==
ServerToClientPushState::kTrailersOnly) {
return false;
}
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kIdle:
server_to_client_pull_state_ = ServerToClientPullState::kReading;
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPullState::kReading:
break;
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing a message";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing trailing metadata";
case ServerToClientPullState::kTerminated:
return Failure{};
}
DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kIdle:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return false;
}
server_trailing_metadata_waiter_.pending();
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kTrailersOnly:
DCHECK_NE(server_trailing_metadata_state_,
ServerTrailingMetadataState::kNotPushed);
return false;
case ServerToClientPushState::kPushedMessage:
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerToClientMessage;
server_to_client_pull_waiter_.Wake();
return true;
case ServerToClientPushState::kFinished:
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
return Failure{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
LOG(FATAL)
<< "FinishPullServerToClientMessage called before metadata available";
case ServerToClientPullState::kIdle:
LOG(FATAL) << "FinishPullServerToClientMessage called twice";
case ServerToClientPullState::kReading:
LOG(FATAL) << "FinishPullServerToClientMessage called before "
<< "PollPullServerToClientMessageAvailable";
case ServerToClientPullState::kProcessingServerToClientMessage:
server_to_client_pull_state_ = ServerToClientPullState::kIdle;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "FinishPullServerToClientMessage called while processing "
"trailing metadata";
case ServerToClientPullState::kTerminated:
break;
}
switch (server_to_client_push_state_) {
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kStart:
LOG(FATAL) << "FinishPullServerToClientMessage called before initial "
"metadata consumed";
case ServerToClientPushState::kTrailersOnly:
LOG(FATAL) << "FinishPullServerToClientMessage called after "
"PushServerTrailingMetadata";
case ServerToClientPushState::kPushedMessage:
server_to_client_push_state_ = ServerToClientPushState::kIdle;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kIdle:
LOG(FATAL) << "FinishPullServerToClientMessage called without a message";
case ServerToClientPushState::kFinished:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<Empty>
CallState::PollServerTrailingMetadataAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollServerTrailingMetadataAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_,
server_trailing_metadata_state_,
server_trailing_metadata_waiter_.DebugString());
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kUnstartedReading:
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kReading:
switch (server_to_client_push_state_) {
case ServerToClientPushState::kTrailersOnly:
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kStart:
case ServerToClientPushState::kFinished:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerTrailingMetadata;
server_to_client_pull_waiter_.Wake();
return Empty{};
}
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::
kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
server_to_client_push_waiter_.pending();
return server_to_client_pull_waiter_.pending();
}
break;
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kIdle:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerTrailingMetadata;
server_to_client_pull_waiter_.Wake();
return Empty{};
}
return server_trailing_metadata_waiter_.pending();
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollServerTrailingMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return Empty{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullServerTrailingMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerTrailingMetadata: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_,
server_trailing_metadata_waiter_.DebugString());
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
LOG(FATAL) << "FinishPullServerTrailingMetadata called before "
"PollServerTrailingMetadataAvailable";
case ServerTrailingMetadataState::kPushed:
server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled;
server_trailing_metadata_waiter_.Wake();
break;
case ServerTrailingMetadataState::kPushedCancel:
server_trailing_metadata_state_ =
ServerTrailingMetadataState::kPulledCancel;
server_trailing_metadata_waiter_.Wake();
break;
case ServerTrailingMetadataState::kPulled:
case ServerTrailingMetadataState::kPulledCancel:
LOG(FATAL) << "FinishPullServerTrailingMetadata called twice";
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool>
CallState::PollWasCancelled() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollWasCancelled: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
case ServerTrailingMetadataState::kPushed:
case ServerTrailingMetadataState::kPushedCancel: {
return server_trailing_metadata_waiter_.pending();
}
case ServerTrailingMetadataState::kPulled:
return false;
case ServerTrailingMetadataState::kPulledCancel:
return true;
}
Crash("Unreachable");
}
} // namespace grpc_core
#endif

@ -86,6 +86,21 @@ grpc_cc_test(
"//test/core/promise:poll_matcher",
],
)
grpc_cc_test(
name = "call_state_test",
srcs = ["call_state_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//src/core:call_state",
"//test/core/promise:poll_matcher",
],
)
grpc_cc_test(
name = "connectivity_state_test",

@ -1166,246 +1166,6 @@ TEST(InfallibleOperationExecutor, InstantTwo) {
gpr_free_aligned(call_data);
}
///////////////////////////////////////////////////////////////////////////////
// CallState
TEST(CallStateTest, NoOp) { CallState state; }
TEST(CallStateTest, StartTwiceCrashes) {
CallState state;
state.Start();
EXPECT_DEATH(state.Start(), "");
}
TEST(CallStateTest, PullServerInitialMetadataBlocksUntilStart) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.Start());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady());
}
TEST(CallStateTest, PullClientInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_DEATH(state.FinishPullClientInitialMetadata(), "");
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
}
TEST(CallStateTest, ClientToServerMessagesWaitForInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
state.BeginPushClientToServerMessage();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
state.BeginPullClientInitialMetadata();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientInitialMetadata());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
}
TEST(CallStateTest, RepeatedClientToServerMessagesWithHalfClose) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
// Message 0
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 1
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 2: push before polling
state.BeginPushClientToServerMessage();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 3: push before polling and half close
state.BeginPushClientToServerMessage();
state.ClientToServerHalfClose();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// ... and now we should see the half close
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false));
}
TEST(CallStateTest, ImmediateClientToServerHalfClose) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
state.ClientToServerHalfClose();
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false));
}
TEST(CallStateTest, ServerToClientMessagesWaitForInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.Start());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata());
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity,
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(),
IsReady(true)));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerInitialMetadata());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
}
TEST(CallStateTest, RepeatedServerToClientMessages) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.PushServerInitialMetadata();
state.Start();
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true));
state.FinishPullServerInitialMetadata();
// Message 0
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 1
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 2: push before polling
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 3: push before polling
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
}
TEST(CallStateTest, ReceiveTrailersOnly) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, ReceiveTrailersOnlySkipsInitialMetadataOnUnstartedCalls) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, RecallNoCancellation) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(false));
}
TEST(CallStateTest, RecallCancellation) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(true);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(true));
}
TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerInitialMetadata();
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerTrailingMetadata(false));
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(false));
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
}
} // namespace filters_detail
///////////////////////////////////////////////////////////////////////////////

@ -0,0 +1,310 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/transport/call_state.h"
#include <vector>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "test/core/promise/poll_matcher.h"
using testing::Mock;
using testing::StrictMock;
namespace grpc_core {
namespace {
// A mock activity that can be activated and deactivated.
class MockActivity : public Activity, public Wakeable {
public:
MOCK_METHOD(void, WakeupRequested, ());
void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); }
void Orphan() override {}
Waker MakeOwningWaker() override { return Waker(this, 0); }
Waker MakeNonOwningWaker() override { return Waker(this, 0); }
void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); }
void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); }
void Drop(WakeupMask /*mask*/) override {}
std::string DebugTag() const override { return "MockActivity"; }
std::string ActivityDebugTag(WakeupMask /*mask*/) const override {
return DebugTag();
}
void Activate() {
if (scoped_activity_ == nullptr) {
scoped_activity_ = std::make_unique<ScopedActivity>(this);
}
}
void Deactivate() { scoped_activity_.reset(); }
private:
std::unique_ptr<ScopedActivity> scoped_activity_;
};
#define EXPECT_WAKEUP(activity, statement) \
EXPECT_CALL((activity), WakeupRequested()).Times(::testing::AtLeast(1)); \
statement; \
Mock::VerifyAndClearExpectations(&(activity));
} // namespace
TEST(CallStateTest, NoOp) { CallState state; }
TEST(CallStateTest, StartTwiceCrashes) {
CallState state;
state.Start();
EXPECT_DEATH(state.Start(), "");
}
TEST(CallStateTest, PullServerInitialMetadataBlocksUntilStart) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.Start());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady());
}
TEST(CallStateTest, PullClientInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_DEATH(state.FinishPullClientInitialMetadata(), "");
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
}
TEST(CallStateTest, ClientToServerMessagesWaitForInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
state.BeginPushClientToServerMessage();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
state.BeginPullClientInitialMetadata();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientInitialMetadata());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
}
TEST(CallStateTest, RepeatedClientToServerMessagesWithHalfClose) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
// Message 0
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 1
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 2: push before polling
state.BeginPushClientToServerMessage();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 3: push before polling and half close
state.BeginPushClientToServerMessage();
state.ClientToServerHalfClose();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// ... and now we should see the half close
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false));
}
TEST(CallStateTest, ImmediateClientToServerHalfClose) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
state.ClientToServerHalfClose();
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false));
}
TEST(CallStateTest, ServerToClientMessagesWaitForInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.Start());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata());
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity,
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(),
IsReady(true)));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerInitialMetadata());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
}
TEST(CallStateTest, RepeatedServerToClientMessages) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.PushServerInitialMetadata();
state.Start();
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true));
state.FinishPullServerInitialMetadata();
// Message 0
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 1
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 2: push before polling
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 3: push before polling
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
}
TEST(CallStateTest, ReceiveTrailersOnly) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, ReceiveTrailersOnlySkipsInitialMetadataOnUnstartedCalls) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, RecallNoCancellation) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(false));
}
TEST(CallStateTest, RecallCancellation) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(true);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(true));
}
TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerInitialMetadata();
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerTrailingMetadata(false));
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(false));
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_tracer_init();
return RUN_ALL_TESTS();
}
Loading…
Cancel
Save