Set trailing_metadata_available for recv_initial_metadata ops when generating a fake status (#28827)

* Set trailing_metadata_available for recv_initial_metadata ops when generating a fake status

* Remove log

* Fix

* Revert "Convert filter to a promise (#28815)"

This reverts commit 361809aabb.

* Add testing
pull/28722/head
Yash Tibrewal 3 years ago committed by GitHub
parent 9cb0747ab0
commit 20aec3b2c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 7
      src/core/lib/transport/transport.h
  3. 134
      test/core/transport/chttp2/streams_not_seen_test.cc

@ -1860,6 +1860,15 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(
}
}
*s->recv_initial_metadata = std::move(s->initial_metadata_buffer);
// If we didn't receive initial metadata from the wire and instead faked a
// status (due to stream cancellations for example), let upper layers know
// that trailing metadata is immediately available.
if (s->trailing_metadata_available != nullptr &&
s->published_metadata[0] != GRPC_METADATA_PUBLISHED_FROM_WIRE &&
s->published_metadata[1] == GRPC_METADATA_SYNTHESIZED_FROM_FAKE) {
*s->trailing_metadata_available = true;
s->trailing_metadata_available = nullptr;
}
null_then_sched_closure(&s->recv_initial_metadata_ready);
}
}

@ -345,8 +345,11 @@ struct grpc_transport_stream_op_batch_payload {
/** Should be enqueued when initial metadata is ready to be processed. */
grpc_closure* recv_initial_metadata_ready = nullptr;
// If not NULL, will be set to true if trailing metadata is
// immediately available. This may be a signal that we received a
// Trailers-Only response.
// immediately available. This may be a signal that we received a
// Trailers-Only response. The retry filter checks this to know whether to
// defer the decision to commit the call or not. The C++ callback API also
// uses this to set the success flag of OnReadInitialMetadataDone()
// callback.
bool* trailing_metadata_available = nullptr;
// If non-NULL, will be set by the transport to the peer string (a char*).
// The transport retains ownership of the string.

@ -34,10 +34,8 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/channel.h"
#include "test/core/end2end/cq_verifier.h"
@ -51,26 +49,17 @@ namespace {
void* Tag(intptr_t t) { return reinterpret_cast<void*>(t); }
// A filter that fails all batches with send ops.
// A filter that records state about trailing metadata.
class TrailingMetadataRecordingFilter {
public:
static grpc_channel_filter kFilterVtable;
static absl::StatusOr<TrailingMetadataRecordingFilter> Create(
const grpc_channel_args*) {
return TrailingMetadataRecordingFilter();
static bool trailing_metadata_available() {
return trailing_metadata_available_;
}
ArenaPromise<TrailingMetadata> MakeCallPromise(
ClientInitialMetadata initial_metadata,
NextPromiseFactory next_promise_factory) {
return ArenaPromise<TrailingMetadata>(
Seq(next_promise_factory(std::move(initial_metadata)),
[](TrailingMetadata trailing_metadata) {
stream_network_state_ =
trailing_metadata->get(GrpcStreamNetworkState());
return Immediate(std::move(trailing_metadata));
}));
static void reset_trailing_metadata_available() {
trailing_metadata_available_ = false;
}
static absl::optional<GrpcStreamNetworkState::ValueType>
@ -82,15 +71,114 @@ class TrailingMetadataRecordingFilter {
stream_network_state_ = absl::nullopt;
}
static void reset_state() {
reset_trailing_metadata_available();
reset_stream_network_state();
}
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) CallData(args);
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
auto* calld = static_cast<CallData*>(elem->call_data);
calld->~CallData();
}
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
auto* calld = static_cast<CallData*>(elem->call_data);
if (batch->recv_initial_metadata) {
calld->trailing_metadata_available_ =
batch->payload->recv_initial_metadata.trailing_metadata_available;
calld->original_recv_initial_metadata_ready_ =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready_;
}
if (batch->recv_trailing_metadata) {
calld->recv_trailing_metadata_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
calld->original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready_;
}
grpc_call_next_op(elem, batch);
}
private:
explicit CallData(const grpc_call_element_args* /*args*/) {
GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
this, nullptr);
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecvTrailingMetadataReady, this, nullptr);
}
static void RecvInitialMetadataReady(void* arg, grpc_error_handle error) {
auto* calld = static_cast<CallData*>(arg);
TrailingMetadataRecordingFilter::trailing_metadata_available_ =
*calld->trailing_metadata_available_;
Closure::Run(DEBUG_LOCATION, calld->original_recv_initial_metadata_ready_,
GRPC_ERROR_REF(error));
}
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error) {
auto* calld = static_cast<CallData*>(arg);
stream_network_state_ =
calld->recv_trailing_metadata_->get(GrpcStreamNetworkState());
Closure::Run(DEBUG_LOCATION,
calld->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(error));
}
bool* trailing_metadata_available_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
};
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* /*args*/) {
new (elem->channel_data) TrailingMetadataRecordingFilter();
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_channel_element* elem) {
auto* chand =
static_cast<TrailingMetadataRecordingFilter*>(elem->channel_data);
chand->~TrailingMetadataRecordingFilter();
}
static bool trailing_metadata_available_;
static absl::optional<GrpcStreamNetworkState::ValueType>
stream_network_state_;
};
grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable =
MakePromiseBasedFilter<TrailingMetadataRecordingFilter,
FilterEndpoint::kClient>(
"trailing-metadata-recording-filter");
grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
sizeof(CallData),
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(TrailingMetadataRecordingFilter),
Init,
Destroy,
grpc_channel_next_get_info,
"trailing-metadata-recording-filter",
};
bool TrailingMetadataRecordingFilter::trailing_metadata_available_;
absl::optional<GrpcStreamNetworkState::ValueType>
TrailingMetadataRecordingFilter::stream_network_state_;
@ -99,7 +187,7 @@ class StreamsNotSeenTest : public ::testing::Test {
explicit StreamsNotSeenTest(bool server_allows_streams = true)
: server_allows_streams_(server_allows_streams) {
// Reset the filter state
TrailingMetadataRecordingFilter::reset_stream_network_state();
TrailingMetadataRecordingFilter::reset_state();
grpc_slice_buffer_init(&read_buffer_);
GRPC_CLOSURE_INIT(&on_read_done_, OnReadDone, this, nullptr);
// Start the test tcp server
@ -362,6 +450,7 @@ TEST_F(StreamsNotSeenTest, StartStreamBeforeGoaway) {
cq_verify(cqv_);
// Verify status and metadata
EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
ASSERT_TRUE(
TrailingMetadataRecordingFilter::stream_network_state().has_value());
EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
@ -500,6 +589,7 @@ TEST_F(StreamsNotSeenTest, StartStreamAfterGoaway) {
cq_verify(cqv_);
// Verify status and metadata
EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
ASSERT_TRUE(
TrailingMetadataRecordingFilter::stream_network_state().has_value());
EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
@ -577,6 +667,7 @@ TEST_F(ZeroConcurrencyTest, StartStreamBeforeGoaway) {
cq_verify(cqv_);
// Verify status and metadata
EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
ASSERT_TRUE(
TrailingMetadataRecordingFilter::stream_network_state().has_value());
EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
@ -642,6 +733,7 @@ TEST_F(ZeroConcurrencyTest, TransportDestroyed) {
cq_verify(cqv_);
// Verify status and metadata
EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
ASSERT_TRUE(
TrailingMetadataRecordingFilter::stream_network_state().has_value());
EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),

Loading…
Cancel
Save