Convert filter to a promise (#28815)

* Convert filter to a promise

* copy/paste fix

* fix
pull/28773/head
Craig Tiller 3 years ago committed by GitHub
parent 8adc21f78d
commit 361809aabb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 101
      test/core/transport/chttp2/streams_not_seen_test.cc

@ -33,8 +33,10 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/channel.h"
#include "test/core/end2end/cq_verifier.h"
@ -53,6 +55,28 @@ class TrailingMetadataRecordingFilter {
public:
static grpc_channel_filter kFilterVtable;
static absl::StatusOr<TrailingMetadataRecordingFilter> Create(
const grpc_channel_args*) {
return TrailingMetadataRecordingFilter();
}
static constexpr bool is_client() { return true; }
static constexpr const char* name() {
return "trailing-metadata-recording-filter";
}
ArenaPromise<TrailingMetadata> MakeCallPromise(
ClientInitialMetadata initial_metadata,
NextPromiseFactory next_promise_factory) {
return ArenaPromise<TrailingMetadata>(
Seq(next_promise_factory(std::move(initial_metadata)),
[](TrailingMetadata trailing_metadata) {
stream_network_state_ =
trailing_metadata->get(GrpcStreamNetworkState());
return Immediate(std::move(trailing_metadata));
}));
}
static absl::optional<GrpcStreamNetworkState::ValueType>
stream_network_state() {
return stream_network_state_;
@ -63,85 +87,12 @@ class TrailingMetadataRecordingFilter {
}
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) CallData(args);
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
auto* calld = static_cast<CallData*>(elem->call_data);
calld->~CallData();
}
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
auto* calld = static_cast<CallData*>(elem->call_data);
if (batch->recv_trailing_metadata) {
calld->recv_trailing_metadata_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
calld->original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready_;
}
grpc_call_next_op(elem, batch);
}
private:
explicit CallData(const grpc_call_element_args* /*args*/) {
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecvTrailingMetadataReady, this, nullptr);
}
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error) {
auto* calld = static_cast<CallData*>(arg);
stream_network_state_ =
calld->recv_trailing_metadata_->get(GrpcStreamNetworkState());
Closure::Run(DEBUG_LOCATION,
calld->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(error));
}
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
};
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* /*args*/) {
new (elem->channel_data) TrailingMetadataRecordingFilter();
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_channel_element* elem) {
auto* chand =
static_cast<TrailingMetadataRecordingFilter*>(elem->channel_data);
chand->~TrailingMetadataRecordingFilter();
}
static absl::optional<GrpcStreamNetworkState::ValueType>
stream_network_state_;
};
grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
sizeof(CallData),
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(TrailingMetadataRecordingFilter),
Init,
Destroy,
grpc_channel_next_get_info,
"TrailingMetadataRecordingFilter",
};
grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable =
MakePromiseBasedFilter<TrailingMetadataRecordingFilter>();
absl::optional<GrpcStreamNetworkState::ValueType>
TrailingMetadataRecordingFilter::stream_network_state_;

Loading…
Cancel
Save