[call-v3] Implement failed_before_recv_message() on {Client,Server}Call (#37810)

Closes #37810

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37810 from ctiller:CANCELLED 134f49a07b
PiperOrigin-RevId: 681236011
pull/37837/head
Craig Tiller 5 months ago committed by Copybara-Service
parent d6fd0bd991
commit 863aca8949
  1. 4
      src/core/lib/surface/client_call.h
  2. 4
      src/core/lib/surface/server_call.h
  3. 7
      src/core/lib/transport/call_filters.h
  4. 8
      src/core/lib/transport/call_spine.h
  5. 23
      src/core/lib/transport/call_state.h
  6. 13
      src/core/util/dump_args.h
  7. 6
      test/core/transport/call_state_test.cc

@ -98,7 +98,9 @@ class ClientCall final
char* GetPeer() override; char* GetPeer() override;
bool Completed() final { Crash("unimplemented"); } bool Completed() final { Crash("unimplemented"); }
bool failed_before_recv_message() const final { Crash("unimplemented"); } bool failed_before_recv_message() const final {
return started_call_initiator_.WasCancelledPushed();
}
grpc_compression_algorithm incoming_compression_algorithm() override { grpc_compression_algorithm incoming_compression_algorithm() override {
return message_receiver_.incoming_compression_algorithm(); return message_receiver_.incoming_compression_algorithm();

@ -129,7 +129,9 @@ class ServerCall final : public Call, public DualRefCounted<ServerCall> {
} }
bool Completed() final { Crash("unimplemented"); } bool Completed() final { Crash("unimplemented"); }
bool failed_before_recv_message() const final { Crash("unimplemented"); } bool failed_before_recv_message() const final {
return call_handler_.WasCancelledPushed();
}
uint32_t test_only_message_flags() override { uint32_t test_only_message_flags() override {
return message_receiver_.last_message_flags(); return message_receiver_.last_message_flags();

@ -1590,10 +1590,17 @@ class CallFilters {
GRPC_MUST_USE_RESULT auto WasCancelled() { GRPC_MUST_USE_RESULT auto WasCancelled() {
return [this]() { return call_state_.PollWasCancelled(); }; return [this]() { return call_state_.PollWasCancelled(); };
} }
// Client & server: returns true if server trailing metadata has been pushed
// *and* contained a cancellation, false otherwise.
GRPC_MUST_USE_RESULT bool WasCancelledPushed() const {
return call_state_.WasCancelledPushed();
}
// Returns true if server trailing metadata has been pulled // Returns true if server trailing metadata has been pulled
bool WasServerTrailingMetadataPulled() const { bool WasServerTrailingMetadataPulled() const {
return call_state_.WasServerTrailingMetadataPulled(); return call_state_.WasServerTrailingMetadataPulled();
} }
// Client & server: fill in final_info with the final status of the call. // Client & server: fill in final_info with the final status of the call.
void Finalize(const grpc_call_final_info* final_info); void Finalize(const grpc_call_final_info* final_info);

@ -263,6 +263,10 @@ class CallInitiator {
return spine_->SpawnWaitable(name, std::move(promise_factory)); return spine_->SpawnWaitable(name, std::move(promise_factory));
} }
bool WasCancelledPushed() const {
return spine_->call_filters().WasCancelledPushed();
}
Arena* arena() { return spine_->arena(); } Arena* arena() { return spine_->arena(); }
Party* party() { return spine_.get(); } Party* party() { return spine_.get(); }
@ -304,6 +308,10 @@ class CallHandler {
auto WasCancelled() { return spine_->WasCancelled(); } auto WasCancelled() { return spine_->WasCancelled(); }
bool WasCancelledPushed() const {
return spine_->call_filters().WasCancelledPushed();
}
template <typename PromiseFactory> template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory, void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory,
DebugLocation whence = {}) { DebugLocation whence = {}) {

@ -52,8 +52,14 @@ class CallState {
Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable(); Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable();
void FinishPullServerToClientMessage(); void FinishPullServerToClientMessage();
Poll<Empty> PollServerTrailingMetadataAvailable(); Poll<Empty> PollServerTrailingMetadataAvailable();
void FinishPullServerTrailingMetadata();
bool WasServerTrailingMetadataPulled() const; bool WasServerTrailingMetadataPulled() const;
// Resolves after server trailing metadata has been pulled, to true if the
// call was cancelled, and false otherwise.
Poll<bool> PollWasCancelled(); Poll<bool> PollWasCancelled();
// Return true if server trailing metadata has been pushed *and* that push was
// a cancellation.
bool WasCancelledPushed() const;
// Debug // Debug
std::string DebugString() const; std::string DebugString() const;
@ -939,6 +945,23 @@ CallState::PollWasCancelled() {
Crash("Unreachable"); Crash("Unreachable");
} }
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool CallState::WasCancelledPushed()
const {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PollWasCancelledPushed: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
case ServerTrailingMetadataState::kPulled:
case ServerTrailingMetadataState::kPushed:
return false;
case ServerTrailingMetadataState::kPushedCancel:
case ServerTrailingMetadataState::kPulledCancel:
return true;
}
Crash("Unreachable");
}
} // namespace grpc_core } // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H #endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H

@ -78,20 +78,25 @@ class DumpArgs {
return 0; return 0;
} }
int AddDumper(void** p) { int AddDumper(void const* const* p) {
arg_dumpers_.push_back( arg_dumpers_.push_back(
[p](CustomSink& os) { os.Append(absl::StrFormat("%p", *p)); }); [p](CustomSink& os) { os.Append(absl::StrFormat("%p", *p)); });
return 0; return 0;
} }
template <typename T> template <typename T>
int AddDumper(T** p) { int AddDumper(T const* const* p) {
return AddDumper(reinterpret_cast<void**>(p)); return AddDumper(reinterpret_cast<void const* const*>(p));
} }
template <typename T> template <typename T>
int AddDumper(T* const* p) { int AddDumper(T* const* p) {
return AddDumper(const_cast<T**>(p)); return AddDumper(const_cast<T const* const*>(p));
}
template <typename T>
int AddDumper(T const** p) {
return AddDumper(const_cast<T const* const*>(p));
} }
void Stringify(CustomSink& sink) const; void Stringify(CustomSink& sink) const;

@ -262,11 +262,14 @@ TEST(CallStateTest, RecallNoCancellation) {
activity.Activate(); activity.Activate();
CallState state; CallState state;
state.Start(); state.Start();
EXPECT_EQ(state.WasCancelledPushed(), false);
state.PushServerTrailingMetadata(false); state.PushServerTrailingMetadata(false);
EXPECT_EQ(state.WasCancelledPushed(), false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata(); state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsReady(false)); EXPECT_THAT(state.PollWasCancelled(), IsReady(false));
EXPECT_EQ(state.WasCancelledPushed(), false);
} }
TEST(CallStateTest, RecallCancellation) { TEST(CallStateTest, RecallCancellation) {
@ -274,11 +277,14 @@ TEST(CallStateTest, RecallCancellation) {
activity.Activate(); activity.Activate();
CallState state; CallState state;
state.Start(); state.Start();
EXPECT_EQ(state.WasCancelledPushed(), false);
state.PushServerTrailingMetadata(true); state.PushServerTrailingMetadata(true);
EXPECT_EQ(state.WasCancelledPushed(), true);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata(); state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsReady(true)); EXPECT_THAT(state.PollWasCancelled(), IsReady(true));
EXPECT_EQ(state.WasCancelledPushed(), true);
} }
TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) { TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) {

Loading…
Cancel
Save