From f3897a5f7ab666f015af2ce05b56eed10243f804 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 6 Apr 2022 09:46:43 -0700 Subject: [PATCH] Fault injection filter -> promises (#29288) * Fault injection filter -> promises --- BUILD | 2 + .../fault_injection/fault_injection_filter.cc | 472 ++++-------------- .../fault_injection/fault_injection_filter.h | 25 +- src/core/ext/xds/xds_http_fault_filter.cc | 2 +- src/core/lib/channel/promise_based_filter.h | 13 +- 5 files changed, 138 insertions(+), 376 deletions(-) diff --git a/BUILD b/BUILD index 1a75a0dd674..5e542935789 100644 --- a/BUILD +++ b/BUILD @@ -2691,6 +2691,8 @@ grpc_cc_library( "grpc_base", "grpc_service_config", "json_util", + "sleep", + "try_seq", ], ) diff --git a/src/core/ext/filters/fault_injection/fault_injection_filter.cc b/src/core/ext/filters/fault_injection/fault_injection_filter.cc index 1d065e4b512..c93c084aa75 100644 --- a/src/core/ext/filters/fault_injection/fault_injection_filter.cc +++ b/src/core/ext/filters/fault_injection/fault_injection_filter.cc @@ -29,11 +29,11 @@ #include "src/core/ext/filters/fault_injection/service_config_parser.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/status_util.h" -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/promise/sleep.h" +#include "src/core/lib/promise/try_seq.h" #include "src/core/lib/service_config/service_config_call_data.h" #include "src/core/lib/transport/status_conversion.h" +#include "src/core/lib/transport/transport.h" namespace grpc_core { @@ -62,434 +62,166 @@ inline bool UnderFraction(const uint32_t numerator, return random_number < numerator; } -class ChannelData { - public: - static grpc_error_handle Init(grpc_channel_element* elem, - grpc_channel_element_args* args); - static void Destroy(grpc_channel_element* elem); - - int index() const { return index_; } - size_t service_config_parser_index() const { - return service_config_parser_index_; - } - - private: - ChannelData(grpc_channel_element* elem, grpc_channel_element_args* args); - ~ChannelData() = default; - - // The relative index of instances of the same filter. - int index_; - const size_t service_config_parser_index_; -}; +} // namespace -class CallData { +class FaultInjectionFilter::InjectionDecision { public: - static grpc_error_handle Init(grpc_call_element* elem, - const grpc_call_element_args* args); - - static void Destroy(grpc_call_element* elem, - const grpc_call_final_info* /*final_info*/, - grpc_closure* /*then_schedule_closure*/); + InjectionDecision(uint32_t max_faults, Duration delay_time, + absl::optional abort_request) + : max_faults_(max_faults), + delay_time_(delay_time), + abort_request_(abort_request) {} - static void StartTransportStreamOpBatch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch); + std::string ToString() const; + Timestamp DelayUntil() const; + absl::Status MaybeAbort() const; private: - class ResumeBatchCanceller; - - CallData(grpc_call_element* elem, const grpc_call_element_args* args); - ~CallData(); - - void DecideWhetherToInjectFaults(grpc_metadata_batch* initial_metadata); - - // Checks if current active faults exceed the allowed max faults. - bool HaveActiveFaultsQuota(bool increment); - - // Returns true if this RPC needs to be delayed. If so, this call will be - // counted as an active fault. - bool MaybeDelay(); - - // Returns the aborted RPC status if this RPC needs to be aborted. If so, - // this call will be counted as an active fault. Otherwise, it returns - // GRPC_ERROR_NONE. - // If this call is already been delay injected, skip the active faults - // quota check. - grpc_error_handle MaybeAbort(); - - // Delays the stream operations batch. - void DelayBatch(grpc_call_element* elem, - grpc_transport_stream_op_batch* batch); - - // Cancels the delay timer. - void CancelDelayTimer() { grpc_timer_cancel(&delay_timer_); } - - // Finishes the fault injection, should only be called once. - void FaultInjectionFinished() { - g_active_faults.fetch_sub(1, std::memory_order_relaxed); - } + bool HaveActiveFaultsQuota(bool increment) const; - // This is a callback that will be invoked after the delay timer is up. - static void ResumeBatch(void* arg, grpc_error_handle error); - - // This is a callback invoked upon completion of recv_trailing_metadata. - // Injects the abort_error_ to the recv_trailing_metadata batch if needed. - static void HijackedRecvTrailingMetadataReady(void* arg, grpc_error_handle); - - // Used to track the policy structs that needs to be destroyed in dtor. - bool fi_policy_owned_ = false; - const FaultInjectionMethodParsedConfig::FaultInjectionPolicy* fi_policy_; - grpc_call_stack* owning_call_; - Arena* arena_; - CallCombiner* call_combiner_; - - // Indicates whether we are doing a delay and/or an abort for this call. - bool delay_request_ = false; - bool abort_request_ = false; - - // Delay states - grpc_timer delay_timer_ ABSL_GUARDED_BY(delay_mu_); - ResumeBatchCanceller* resume_batch_canceller_ ABSL_GUARDED_BY(delay_mu_); - grpc_transport_stream_op_batch* delayed_batch_ ABSL_GUARDED_BY(delay_mu_); - // Abort states - grpc_error_handle abort_error_ = GRPC_ERROR_NONE; - grpc_closure recv_trailing_metadata_ready_; - grpc_closure* original_recv_trailing_metadata_ready_; - // Protects the asynchronous delay, resume, and cancellation. - Mutex delay_mu_; + uint32_t max_faults_; + Duration delay_time_; + absl::optional abort_request_; }; -// ChannelData - -grpc_error_handle ChannelData::Init(grpc_channel_element* elem, - grpc_channel_element_args* args) { - GPR_ASSERT(elem->filter == &FaultInjectionFilterVtable); - new (elem->channel_data) ChannelData(elem, args); - return GRPC_ERROR_NONE; +absl::StatusOr FaultInjectionFilter::Create( + ChannelArgs, ChannelFilter::Args filter_args) { + return FaultInjectionFilter(filter_args); } -void ChannelData::Destroy(grpc_channel_element* elem) { - auto* chand = static_cast(elem->channel_data); - chand->~ChannelData(); -} - -ChannelData::ChannelData(grpc_channel_element* elem, - grpc_channel_element_args* args) - : index_( - grpc_channel_stack_filter_instance_number(args->channel_stack, elem)), +FaultInjectionFilter::FaultInjectionFilter(ChannelFilter::Args filter_args) + : index_(grpc_channel_stack_filter_instance_number( + filter_args.channel_stack(), + filter_args.uninitialized_channel_element())), service_config_parser_index_( FaultInjectionServiceConfigParser::ParserIndex()) {} -// CallData::ResumeBatchCanceller - -class CallData::ResumeBatchCanceller { - public: - explicit ResumeBatchCanceller(grpc_call_element* elem) : elem_(elem) { - auto* calld = static_cast(elem->call_data); - GRPC_CALL_STACK_REF(calld->owning_call_, "ResumeBatchCanceller"); - GRPC_CLOSURE_INIT(&closure_, &Cancel, this, grpc_schedule_on_exec_ctx); - calld->call_combiner_->SetNotifyOnCancel(&closure_); - } - - private: - static void Cancel(void* arg, grpc_error_handle error) { - auto* self = static_cast(arg); - auto* chand = static_cast(self->elem_->channel_data); - auto* calld = static_cast(self->elem_->call_data); - { - MutexLock lock(&calld->delay_mu_); - if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: cancelling schdueled pick: " - "error=%s self=%p calld->resume_batch_canceller_=%p", - chand, calld, grpc_error_std_string(error).c_str(), self, - calld->resume_batch_canceller_); - } - if (error != GRPC_ERROR_NONE && calld->resume_batch_canceller_ == self) { - // Cancel the delayed pick. - calld->CancelDelayTimer(); - calld->FaultInjectionFinished(); - // Fail pending batches on the call. - grpc_transport_stream_op_batch_finish_with_failure( - calld->delayed_batch_, GRPC_ERROR_REF(error), - calld->call_combiner_); - } - } - GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResumeBatchCanceller"); - delete self; - } - - grpc_call_element* elem_; - grpc_closure closure_; -}; - -// CallData - -grpc_error_handle CallData::Init(grpc_call_element* elem, - const grpc_call_element_args* args) { - auto* calld = new (elem->call_data) CallData(elem, args); - if (calld->fi_policy_ == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "failed to find fault injection policy"); - } - return GRPC_ERROR_NONE; -} - -void CallData::Destroy(grpc_call_element* elem, - const grpc_call_final_info* /*final_info*/, - grpc_closure* /*then_schedule_closure*/) { - auto* calld = static_cast(elem->call_data); - calld->~CallData(); -} - -void CallData::StartTransportStreamOpBatch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { - auto* calld = static_cast(elem->call_data); - // There should only be one send_initial_metdata op, and fault injection also - // only need to be enforced once. - if (batch->send_initial_metadata) { - calld->DecideWhetherToInjectFaults( - batch->payload->send_initial_metadata.send_initial_metadata); - if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: Fault injection triggered delay=%d abort=%d", - elem->channel_data, calld, calld->delay_request_, - calld->abort_request_); - } - if (calld->MaybeDelay()) { - // Delay the batch, and pass down the batch in the scheduled closure. - calld->DelayBatch(elem, batch); - return; - } - grpc_error_handle abort_error = calld->MaybeAbort(); - if (abort_error != GRPC_ERROR_NONE) { - calld->abort_error_ = abort_error; - grpc_transport_stream_op_batch_finish_with_failure( - batch, GRPC_ERROR_REF(calld->abort_error_), calld->call_combiner_); - return; - } - } else { - if (batch->recv_trailing_metadata) { - // Intercept recv_trailing_metadata callback so that we can inject the - // failure when aborting streaming calls, because their - // recv_trailing_metatdata op may not be on the same batch as the - // send_initial_metadata op. - calld->original_recv_trailing_metadata_ready_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &calld->recv_trailing_metadata_ready_; - } - if (calld->abort_error_ != GRPC_ERROR_NONE) { - // If we already decided to abort, then immediately fail this batch. - grpc_transport_stream_op_batch_finish_with_failure( - batch, GRPC_ERROR_REF(calld->abort_error_), calld->call_combiner_); - return; - } +// Construct a promise for one call. +ArenaPromise FaultInjectionFilter::MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) { + auto decision = MakeInjectionDecision(call_args.client_initial_metadata); + if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) { + gpr_log(GPR_INFO, "chand=%p: Fault injection triggered %s", this, + decision.ToString().c_str()); } - // Chain to the next filter. - grpc_call_next_op(elem, batch); + return TrySeq( + Sleep(decision.DelayUntil()), + [decision]() { return decision.MaybeAbort(); }, + next_promise_factory(std::move(call_args))); } -CallData::CallData(grpc_call_element* elem, const grpc_call_element_args* args) - : owning_call_(args->call_stack), - arena_(args->arena), - call_combiner_(args->call_combiner) { - auto* chand = static_cast(elem->channel_data); +FaultInjectionFilter::InjectionDecision +FaultInjectionFilter::MakeInjectionDecision( + const ClientMetadataHandle& initial_metadata) { // Fetch the fault injection policy from the service config, based on the // relative index for which policy should this CallData use. auto* service_config_call_data = static_cast( - args->context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); + GetContext< + grpc_call_context_element>()[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA] + .value); auto* method_params = static_cast( service_config_call_data->GetMethodParsedConfig( - chand->service_config_parser_index())); + service_config_parser_index_)); + const FaultInjectionMethodParsedConfig::FaultInjectionPolicy* fi_policy = + nullptr; if (method_params != nullptr) { - fi_policy_ = method_params->fault_injection_policy(chand->index()); + fi_policy = method_params->fault_injection_policy(index_); } - GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, - HijackedRecvTrailingMetadataReady, elem, - grpc_schedule_on_exec_ctx); -} -CallData::~CallData() { - if (fi_policy_owned_) { - fi_policy_->~FaultInjectionPolicy(); - } - GRPC_ERROR_UNREF(abort_error_); -} + grpc_status_code abort_code = fi_policy->abort_code; + uint32_t abort_percentage_numerator = fi_policy->abort_percentage_numerator; + uint32_t delay_percentage_numerator = fi_policy->delay_percentage_numerator; + Duration delay = fi_policy->delay; -void CallData::DecideWhetherToInjectFaults( - grpc_metadata_batch* initial_metadata) { - FaultInjectionMethodParsedConfig::FaultInjectionPolicy* copied_policy = - nullptr; // Update the policy with values in initial metadata. - if (!fi_policy_->abort_code_header.empty() || - !fi_policy_->abort_percentage_header.empty() || - !fi_policy_->delay_header.empty() || - !fi_policy_->delay_percentage_header.empty()) { - // Defer the actual copy until the first matched header. - auto maybe_copy_policy_func = [this, &copied_policy]() { - if (copied_policy == nullptr) { - copied_policy = - arena_->New( - *fi_policy_); - } - }; + if (!fi_policy->abort_code_header.empty() || + !fi_policy->abort_percentage_header.empty() || + !fi_policy->delay_header.empty() || + !fi_policy->delay_percentage_header.empty()) { std::string buffer; - if (!fi_policy_->abort_code_header.empty() && - (copied_policy == nullptr || - copied_policy->abort_code == GRPC_STATUS_OK)) { + if (!fi_policy->abort_code_header.empty() && abort_code == GRPC_STATUS_OK) { auto value = initial_metadata->GetStringValue( - fi_policy_->abort_code_header, &buffer); + fi_policy->abort_code_header, &buffer); if (value.has_value()) { - maybe_copy_policy_func(); grpc_status_code_from_int( - AsInt(*value).value_or(GRPC_STATUS_UNKNOWN), - &copied_policy->abort_code); + AsInt(*value).value_or(GRPC_STATUS_UNKNOWN), &abort_code); } } - if (!fi_policy_->abort_percentage_header.empty()) { + if (!fi_policy->abort_percentage_header.empty()) { auto value = initial_metadata->GetStringValue( - fi_policy_->abort_percentage_header, &buffer); + fi_policy->abort_percentage_header, &buffer); if (value.has_value()) { - maybe_copy_policy_func(); - copied_policy->abort_percentage_numerator = - std::min(AsInt(*value).value_or(-1), - fi_policy_->abort_percentage_numerator); + abort_percentage_numerator = std::min( + AsInt(*value).value_or(-1), abort_percentage_numerator); } } - if (!fi_policy_->delay_header.empty() && - (copied_policy == nullptr || - copied_policy->delay == Duration::Zero())) { + if (!fi_policy->delay_header.empty() && delay == Duration::Zero()) { auto value = - initial_metadata->GetStringValue(fi_policy_->delay_header, &buffer); + initial_metadata->GetStringValue(fi_policy->delay_header, &buffer); if (value.has_value()) { - maybe_copy_policy_func(); - copied_policy->delay = Duration::Milliseconds( + delay = Duration::Milliseconds( std::max(AsInt(*value).value_or(0), int64_t(0))); } } - if (!fi_policy_->delay_percentage_header.empty()) { + if (!fi_policy->delay_percentage_header.empty()) { auto value = initial_metadata->GetStringValue( - fi_policy_->delay_percentage_header, &buffer); + fi_policy->delay_percentage_header, &buffer); if (value.has_value()) { - maybe_copy_policy_func(); - copied_policy->delay_percentage_numerator = - std::min(AsInt(*value).value_or(-1), - fi_policy_->delay_percentage_numerator); + delay_percentage_numerator = std::min( + AsInt(*value).value_or(-1), delay_percentage_numerator); } } - if (copied_policy != nullptr) fi_policy_ = copied_policy; } // Roll the dice - delay_request_ = fi_policy_->delay != Duration::Zero() && - UnderFraction(fi_policy_->delay_percentage_numerator, - fi_policy_->delay_percentage_denominator); - abort_request_ = fi_policy_->abort_code != GRPC_STATUS_OK && - UnderFraction(fi_policy_->abort_percentage_numerator, - fi_policy_->abort_percentage_denominator); - if (!delay_request_ && !abort_request_) { - if (copied_policy != nullptr) copied_policy->~FaultInjectionPolicy(); - // No fault injection for this call - } else { - fi_policy_owned_ = copied_policy != nullptr; - } -} - -bool CallData::HaveActiveFaultsQuota(bool increment) { - if (g_active_faults.load(std::memory_order_acquire) >= - fi_policy_->max_faults) { + const bool delay_request = + delay != Duration::Zero() && + UnderFraction(delay_percentage_numerator, + fi_policy->delay_percentage_denominator); + const bool abort_request = + abort_code != GRPC_STATUS_OK && + UnderFraction(abort_percentage_numerator, + fi_policy->abort_percentage_denominator); + + return InjectionDecision( + fi_policy->max_faults, delay_request ? delay : Duration::Zero(), + abort_request ? absl::optional(absl::Status( + static_cast(abort_code), + fi_policy->abort_message)) + : absl::nullopt); +} + +bool FaultInjectionFilter::InjectionDecision::HaveActiveFaultsQuota( + bool increment) const { + if (g_active_faults.load(std::memory_order_acquire) >= max_faults_) { return false; } if (increment) g_active_faults.fetch_add(1, std::memory_order_relaxed); return true; } -bool CallData::MaybeDelay() { - if (delay_request_) { - return HaveActiveFaultsQuota(true); +Timestamp FaultInjectionFilter::InjectionDecision::DelayUntil() const { + if (delay_time_ != Duration::Zero() && HaveActiveFaultsQuota(true)) { + return ExecCtx::Get()->Now() + delay_time_; } - return false; + return Timestamp::InfPast(); } -grpc_error_handle CallData::MaybeAbort() { - if (abort_request_ && (delay_request_ || HaveActiveFaultsQuota(false))) { - return grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_COPIED_STRING(fi_policy_->abort_message.c_str()), - GRPC_ERROR_INT_GRPC_STATUS, fi_policy_->abort_code); +absl::Status FaultInjectionFilter::InjectionDecision::MaybeAbort() const { + if (abort_request_.has_value() && + (delay_time_ != Duration::Zero() || HaveActiveFaultsQuota(false))) { + return abort_request_.value(); } - return GRPC_ERROR_NONE; + return absl::OkStatus(); } -void CallData::DelayBatch(grpc_call_element* elem, - grpc_transport_stream_op_batch* batch) { - MutexLock lock(&delay_mu_); - delayed_batch_ = batch; - resume_batch_canceller_ = new ResumeBatchCanceller(elem); - Timestamp resume_time = ExecCtx::Get()->Now() + fi_policy_->delay; - GRPC_CLOSURE_INIT(&batch->handler_private.closure, ResumeBatch, elem, - grpc_schedule_on_exec_ctx); - grpc_timer_init(&delay_timer_, resume_time, &batch->handler_private.closure); +std::string FaultInjectionFilter::InjectionDecision::ToString() const { + return absl::StrCat("delay=", delay_time_ != Duration::Zero(), + " abort=", abort_request_.has_value()); } -void CallData::ResumeBatch(void* arg, grpc_error_handle error) { - grpc_call_element* elem = static_cast(arg); - auto* calld = static_cast(elem->call_data); - MutexLock lock(&calld->delay_mu_); - // Cancelled or canceller has already run - if (error == GRPC_ERROR_CANCELLED || - calld->resume_batch_canceller_ == nullptr) { - return; - } - if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) { - gpr_log(GPR_INFO, "chand=%p calld=%p: Resuming delayed stream op batch %p", - elem->channel_data, calld, calld->delayed_batch_); - } - // Lame the canceller - calld->resume_batch_canceller_ = nullptr; - // Finish fault injection. - calld->FaultInjectionFinished(); - // Abort if needed. - error = calld->MaybeAbort(); - if (error != GRPC_ERROR_NONE) { - calld->abort_error_ = error; - grpc_transport_stream_op_batch_finish_with_failure( - calld->delayed_batch_, GRPC_ERROR_REF(calld->abort_error_), - calld->call_combiner_); - return; - } - // Chain to the next filter. - grpc_call_next_op(elem, calld->delayed_batch_); -} - -void CallData::HijackedRecvTrailingMetadataReady(void* arg, - grpc_error_handle error) { - grpc_call_element* elem = static_cast(arg); - auto* calld = static_cast(elem->call_data); - if (calld->abort_error_ != GRPC_ERROR_NONE) { - error = grpc_error_add_child(GRPC_ERROR_REF(error), - GRPC_ERROR_REF(calld->abort_error_)); - } else { - error = GRPC_ERROR_REF(error); - } - Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_, - error); -} - -} // namespace - -extern const grpc_channel_filter FaultInjectionFilterVtable = { - CallData::StartTransportStreamOpBatch, - nullptr, - grpc_channel_next_op, - sizeof(CallData), - CallData::Init, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - CallData::Destroy, - sizeof(ChannelData), - ChannelData::Init, - ChannelData::Destroy, - grpc_channel_next_get_info, - "fault_injection_filter", -}; +const grpc_channel_filter FaultInjectionFilter::kFilter = + MakePromiseBasedFilter( + "fault_injection_filter"); void FaultInjectionFilterRegister(CoreConfiguration::Builder* builder) { FaultInjectionServiceConfigParser::Register(builder); diff --git a/src/core/ext/filters/fault_injection/fault_injection_filter.h b/src/core/ext/filters/fault_injection/fault_injection_filter.h index fbc1eb0cf9e..49c6c0e31a8 100644 --- a/src/core/ext/filters/fault_injection/fault_injection_filter.h +++ b/src/core/ext/filters/fault_injection/fault_injection_filter.h @@ -21,6 +21,8 @@ #include "src/core/ext/filters/fault_injection/service_config_parser.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/transport/transport.h" // Channel arg key for enabling parsing fault injection via method config. #define GRPC_ARG_PARSE_FAULT_INJECTION_METHOD_CONFIG \ @@ -32,7 +34,28 @@ namespace grpc_core { // of the ordinary channel stack. The fault injection filter fetches fault // injection policy from the method config of service config returned by the // resolver, and enforces the fault injection policy. -extern const grpc_channel_filter FaultInjectionFilterVtable; +class FaultInjectionFilter : public ChannelFilter { + public: + static const grpc_channel_filter kFilter; + + static absl::StatusOr Create( + ChannelArgs args, ChannelFilter::Args filter_args); + + // Construct a promise for one call. + ArenaPromise MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) override; + + private: + explicit FaultInjectionFilter(ChannelFilter::Args filter_args); + + class InjectionDecision; + InjectionDecision MakeInjectionDecision( + const ClientMetadataHandle& initial_metadata); + + // The relative index of instances of the same filter. + int index_; + const size_t service_config_parser_index_; +}; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_http_fault_filter.cc b/src/core/ext/xds/xds_http_fault_filter.cc index 015556faaec..4c7e826cc1e 100644 --- a/src/core/ext/xds/xds_http_fault_filter.cc +++ b/src/core/ext/xds/xds_http_fault_filter.cc @@ -198,7 +198,7 @@ XdsHttpFaultFilter::GenerateFilterConfigOverride( } const grpc_channel_filter* XdsHttpFaultFilter::channel_filter() const { - return &FaultInjectionFilterVtable; + return &FaultInjectionFilter::kFilter; } grpc_channel_args* XdsHttpFaultFilter::ModifyChannelArgs( diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index f65ab3be18d..58c7286ff3d 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -43,15 +43,20 @@ class ChannelFilter { public: class Args { public: - Args() : Args(nullptr) {} - explicit Args(grpc_channel_stack* channel_stack) - : channel_stack_(channel_stack) {} + Args() : Args(nullptr, nullptr) {} + explicit Args(grpc_channel_stack* channel_stack, + grpc_channel_element* channel_element) + : channel_stack_(channel_stack), channel_element_(channel_element) {} grpc_channel_stack* channel_stack() const { return channel_stack_; } + grpc_channel_element* uninitialized_channel_element() { + return channel_element_; + } private: friend class ChannelFilter; grpc_channel_stack* channel_stack_; + grpc_channel_element* channel_element_; }; // Construct a promise for one call. @@ -411,7 +416,7 @@ MakePromiseBasedFilter(const char* name) { [](grpc_channel_element* elem, grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); auto status = F::Create(ChannelArgs::FromC(args->channel_args), - ChannelFilter::Args(args->channel_stack)); + ChannelFilter::Args(args->channel_stack, elem)); if (!status.ok()) return absl_status_to_grpc_error(status.status()); new (elem->channel_data) F(std::move(*status)); return GRPC_ERROR_NONE;