Roll forward fault injection --> promises (#29452)

* Revert "Revert "Fault injection filter -> promises (#29288)" (#29395)"

This reverts commit 14cdbc68af.

* fix refcounting
pull/29458/head
Craig Tiller 3 years ago committed by GitHub
parent 5d843c482c
commit 9f65dfed5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      BUILD
  2. 494
      src/core/ext/filters/fault_injection/fault_injection_filter.cc
  3. 25
      src/core/ext/filters/fault_injection/fault_injection_filter.h
  4. 2
      src/core/ext/xds/xds_http_fault_filter.cc
  5. 13
      src/core/lib/channel/promise_based_filter.h

@ -2694,10 +2694,13 @@ grpc_cc_library(
external_deps = ["absl/strings"],
language = "c++",
deps = [
"capture",
"gpr_base",
"grpc_base",
"grpc_service_config",
"json_util",
"sleep",
"try_seq",
],
)

@ -29,11 +29,12 @@
#include "src/core/ext/filters/fault_injection/service_config_parser.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/gprpp/capture.h"
#include "src/core/lib/promise/sleep.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
@ -62,434 +63,193 @@ inline bool UnderFraction(const uint32_t numerator,
return random_number < numerator;
}
class ChannelData {
// Tracks an active faults lifetime.
// Increments g_active_faults when created, and decrements it when destroyed.
class FaultHandle {
public:
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* args);
static void Destroy(grpc_channel_element* elem);
int index() const { return index_; }
size_t service_config_parser_index() const {
return service_config_parser_index_;
explicit FaultHandle(bool active) : active_(active) {
if (active) {
g_active_faults.fetch_add(1, std::memory_order_relaxed);
}
}
~FaultHandle() {
if (active_) {
g_active_faults.fetch_sub(1, std::memory_order_relaxed);
}
}
FaultHandle(const FaultHandle&) = delete;
FaultHandle& operator=(const FaultHandle&) = delete;
FaultHandle(FaultHandle&& other) noexcept
: active_(absl::exchange(other.active_, false)) {}
FaultHandle& operator=(FaultHandle&& other) noexcept {
std::swap(active_, other.active_);
return *this;
}
private:
ChannelData(grpc_channel_element* elem, grpc_channel_element_args* args);
~ChannelData() = default;
// The relative index of instances of the same filter.
int index_;
const size_t service_config_parser_index_;
bool active_;
};
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args);
} // namespace
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*then_schedule_closure*/);
class FaultInjectionFilter::InjectionDecision {
public:
InjectionDecision(uint32_t max_faults, Duration delay_time,
absl::optional<absl::Status> abort_request)
: max_faults_(max_faults),
delay_time_(delay_time),
abort_request_(abort_request) {}
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
std::string ToString() const;
Timestamp DelayUntil();
absl::Status MaybeAbort() const;
private:
class ResumeBatchCanceller;
bool HaveActiveFaultsQuota() const;
CallData(grpc_call_element* elem, const grpc_call_element_args* args);
~CallData();
void DecideWhetherToInjectFaults(grpc_metadata_batch* initial_metadata);
// Checks if current active faults exceed the allowed max faults.
bool HaveActiveFaultsQuota(bool increment);
// Returns true if this RPC needs to be delayed. If so, this call will be
// counted as an active fault.
bool MaybeDelay();
// Returns the aborted RPC status if this RPC needs to be aborted. If so,
// this call will be counted as an active fault. Otherwise, it returns
// GRPC_ERROR_NONE.
// If this call is already been delay injected, skip the active faults
// quota check.
grpc_error_handle MaybeAbort();
// Delays the stream operations batch.
void DelayBatch(grpc_call_element* elem,
grpc_transport_stream_op_batch* batch);
// Cancels the delay timer.
void CancelDelayTimer() { grpc_timer_cancel(&delay_timer_); }
// Finishes the fault injection, should only be called once.
void FaultInjectionFinished() {
g_active_faults.fetch_sub(1, std::memory_order_relaxed);
}
// This is a callback that will be invoked after the delay timer is up.
static void ResumeBatch(void* arg, grpc_error_handle error);
// This is a callback invoked upon completion of recv_trailing_metadata.
// Injects the abort_error_ to the recv_trailing_metadata batch if needed.
static void HijackedRecvTrailingMetadataReady(void* arg, grpc_error_handle);
// Used to track the policy structs that needs to be destroyed in dtor.
bool fi_policy_owned_ = false;
const FaultInjectionMethodParsedConfig::FaultInjectionPolicy* fi_policy_;
grpc_call_stack* owning_call_;
Arena* arena_;
CallCombiner* call_combiner_;
// Indicates whether we are doing a delay and/or an abort for this call.
bool delay_request_ = false;
bool abort_request_ = false;
// Delay states
grpc_timer delay_timer_ ABSL_GUARDED_BY(delay_mu_);
ResumeBatchCanceller* resume_batch_canceller_ ABSL_GUARDED_BY(delay_mu_);
grpc_transport_stream_op_batch* delayed_batch_ ABSL_GUARDED_BY(delay_mu_);
// Abort states
grpc_error_handle abort_error_ = GRPC_ERROR_NONE;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_;
// Protects the asynchronous delay, resume, and cancellation.
Mutex delay_mu_;
uint32_t max_faults_;
Duration delay_time_;
absl::optional<absl::Status> abort_request_;
FaultHandle active_fault_{false};
};
// ChannelData
grpc_error_handle ChannelData::Init(grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(elem->filter == &FaultInjectionFilterVtable);
new (elem->channel_data) ChannelData(elem, args);
return GRPC_ERROR_NONE;
}
void ChannelData::Destroy(grpc_channel_element* elem) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
chand->~ChannelData();
absl::StatusOr<FaultInjectionFilter> FaultInjectionFilter::Create(
ChannelArgs, ChannelFilter::Args filter_args) {
return FaultInjectionFilter(filter_args);
}
ChannelData::ChannelData(grpc_channel_element* elem,
grpc_channel_element_args* args)
: index_(
grpc_channel_stack_filter_instance_number(args->channel_stack, elem)),
FaultInjectionFilter::FaultInjectionFilter(ChannelFilter::Args filter_args)
: index_(grpc_channel_stack_filter_instance_number(
filter_args.channel_stack(),
filter_args.uninitialized_channel_element())),
service_config_parser_index_(
FaultInjectionServiceConfigParser::ParserIndex()) {}
// CallData::ResumeBatchCanceller
class CallData::ResumeBatchCanceller {
public:
explicit ResumeBatchCanceller(grpc_call_element* elem) : elem_(elem) {
auto* calld = static_cast<CallData*>(elem->call_data);
GRPC_CALL_STACK_REF(calld->owning_call_, "ResumeBatchCanceller");
GRPC_CLOSURE_INIT(&closure_, &Cancel, this, grpc_schedule_on_exec_ctx);
calld->call_combiner_->SetNotifyOnCancel(&closure_);
}
private:
static void Cancel(void* arg, grpc_error_handle error) {
auto* self = static_cast<ResumeBatchCanceller*>(arg);
auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
auto* calld = static_cast<CallData*>(self->elem_->call_data);
{
MutexLock lock(&calld->delay_mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: cancelling schdueled pick: "
"error=%s self=%p calld->resume_batch_canceller_=%p",
chand, calld, grpc_error_std_string(error).c_str(), self,
calld->resume_batch_canceller_);
}
if (error != GRPC_ERROR_NONE && calld->resume_batch_canceller_ == self) {
// Cancel the delayed pick.
calld->CancelDelayTimer();
calld->FaultInjectionFinished();
// Fail pending batches on the call.
grpc_transport_stream_op_batch_finish_with_failure(
calld->delayed_batch_, GRPC_ERROR_REF(error),
calld->call_combiner_);
}
}
GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResumeBatchCanceller");
delete self;
}
grpc_call_element* elem_;
grpc_closure closure_;
};
// CallData
grpc_error_handle CallData::Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
auto* calld = new (elem->call_data) CallData(elem, args);
if (calld->fi_policy_ == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"failed to find fault injection policy");
}
return GRPC_ERROR_NONE;
}
void CallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*then_schedule_closure*/) {
auto* calld = static_cast<CallData*>(elem->call_data);
calld->~CallData();
}
void CallData::StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
auto* calld = static_cast<CallData*>(elem->call_data);
// There should only be one send_initial_metdata op, and fault injection also
// only need to be enforced once.
if (batch->send_initial_metadata) {
calld->DecideWhetherToInjectFaults(
batch->payload->send_initial_metadata.send_initial_metadata);
if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: Fault injection triggered delay=%d abort=%d",
elem->channel_data, calld, calld->delay_request_,
calld->abort_request_);
}
if (calld->MaybeDelay()) {
// Delay the batch, and pass down the batch in the scheduled closure.
calld->DelayBatch(elem, batch);
return;
}
grpc_error_handle abort_error = calld->MaybeAbort();
if (abort_error != GRPC_ERROR_NONE) {
calld->abort_error_ = abort_error;
grpc_transport_stream_op_batch_finish_with_failure(
batch, GRPC_ERROR_REF(calld->abort_error_), calld->call_combiner_);
return;
}
} else {
if (batch->recv_trailing_metadata) {
// Intercept recv_trailing_metadata callback so that we can inject the
// failure when aborting streaming calls, because their
// recv_trailing_metatdata op may not be on the same batch as the
// send_initial_metadata op.
calld->original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready_;
}
if (calld->abort_error_ != GRPC_ERROR_NONE) {
// If we already decided to abort, then immediately fail this batch.
grpc_transport_stream_op_batch_finish_with_failure(
batch, GRPC_ERROR_REF(calld->abort_error_), calld->call_combiner_);
return;
}
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> FaultInjectionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto decision = MakeInjectionDecision(call_args.client_initial_metadata);
if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) {
gpr_log(GPR_INFO, "chand=%p: Fault injection triggered %s", this,
decision.ToString().c_str());
}
// Chain to the next filter.
grpc_call_next_op(elem, batch);
auto delay = decision.DelayUntil();
return TrySeq(
Sleep(delay),
Capture(
[](InjectionDecision* decision) { return decision->MaybeAbort(); },
std::move(decision)),
next_promise_factory(std::move(call_args)));
}
CallData::CallData(grpc_call_element* elem, const grpc_call_element_args* args)
: owning_call_(args->call_stack),
arena_(args->arena),
call_combiner_(args->call_combiner) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
FaultInjectionFilter::InjectionDecision
FaultInjectionFilter::MakeInjectionDecision(
const ClientMetadataHandle& initial_metadata) {
// Fetch the fault injection policy from the service config, based on the
// relative index for which policy should this CallData use.
auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
args->context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
GetContext<
grpc_call_context_element>()[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA]
.value);
auto* method_params = static_cast<FaultInjectionMethodParsedConfig*>(
service_config_call_data->GetMethodParsedConfig(
chand->service_config_parser_index()));
service_config_parser_index_));
const FaultInjectionMethodParsedConfig::FaultInjectionPolicy* fi_policy =
nullptr;
if (method_params != nullptr) {
fi_policy_ = method_params->fault_injection_policy(chand->index());
fi_policy = method_params->fault_injection_policy(index_);
}
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
HijackedRecvTrailingMetadataReady, elem,
grpc_schedule_on_exec_ctx);
}
CallData::~CallData() {
if (fi_policy_owned_) {
fi_policy_->~FaultInjectionPolicy();
}
GRPC_ERROR_UNREF(abort_error_);
}
grpc_status_code abort_code = fi_policy->abort_code;
uint32_t abort_percentage_numerator = fi_policy->abort_percentage_numerator;
uint32_t delay_percentage_numerator = fi_policy->delay_percentage_numerator;
Duration delay = fi_policy->delay;
void CallData::DecideWhetherToInjectFaults(
grpc_metadata_batch* initial_metadata) {
FaultInjectionMethodParsedConfig::FaultInjectionPolicy* copied_policy =
nullptr;
// Update the policy with values in initial metadata.
if (!fi_policy_->abort_code_header.empty() ||
!fi_policy_->abort_percentage_header.empty() ||
!fi_policy_->delay_header.empty() ||
!fi_policy_->delay_percentage_header.empty()) {
// Defer the actual copy until the first matched header.
auto maybe_copy_policy_func = [this, &copied_policy]() {
if (copied_policy == nullptr) {
copied_policy =
arena_->New<FaultInjectionMethodParsedConfig::FaultInjectionPolicy>(
*fi_policy_);
}
};
if (!fi_policy->abort_code_header.empty() ||
!fi_policy->abort_percentage_header.empty() ||
!fi_policy->delay_header.empty() ||
!fi_policy->delay_percentage_header.empty()) {
std::string buffer;
if (!fi_policy_->abort_code_header.empty() &&
(copied_policy == nullptr ||
copied_policy->abort_code == GRPC_STATUS_OK)) {
if (!fi_policy->abort_code_header.empty() && abort_code == GRPC_STATUS_OK) {
auto value = initial_metadata->GetStringValue(
fi_policy_->abort_code_header, &buffer);
fi_policy->abort_code_header, &buffer);
if (value.has_value()) {
maybe_copy_policy_func();
grpc_status_code_from_int(
AsInt<int>(*value).value_or(GRPC_STATUS_UNKNOWN),
&copied_policy->abort_code);
AsInt<int>(*value).value_or(GRPC_STATUS_UNKNOWN), &abort_code);
}
}
if (!fi_policy_->abort_percentage_header.empty()) {
if (!fi_policy->abort_percentage_header.empty()) {
auto value = initial_metadata->GetStringValue(
fi_policy_->abort_percentage_header, &buffer);
fi_policy->abort_percentage_header, &buffer);
if (value.has_value()) {
maybe_copy_policy_func();
copied_policy->abort_percentage_numerator =
std::min(AsInt<uint32_t>(*value).value_or(-1),
fi_policy_->abort_percentage_numerator);
abort_percentage_numerator = std::min(
AsInt<uint32_t>(*value).value_or(-1), abort_percentage_numerator);
}
}
if (!fi_policy_->delay_header.empty() &&
(copied_policy == nullptr ||
copied_policy->delay == Duration::Zero())) {
if (!fi_policy->delay_header.empty() && delay == Duration::Zero()) {
auto value =
initial_metadata->GetStringValue(fi_policy_->delay_header, &buffer);
initial_metadata->GetStringValue(fi_policy->delay_header, &buffer);
if (value.has_value()) {
maybe_copy_policy_func();
copied_policy->delay = Duration::Milliseconds(
delay = Duration::Milliseconds(
std::max(AsInt<int64_t>(*value).value_or(0), int64_t(0)));
}
}
if (!fi_policy_->delay_percentage_header.empty()) {
if (!fi_policy->delay_percentage_header.empty()) {
auto value = initial_metadata->GetStringValue(
fi_policy_->delay_percentage_header, &buffer);
fi_policy->delay_percentage_header, &buffer);
if (value.has_value()) {
maybe_copy_policy_func();
copied_policy->delay_percentage_numerator =
std::min(AsInt<uint32_t>(*value).value_or(-1),
fi_policy_->delay_percentage_numerator);
delay_percentage_numerator = std::min(
AsInt<uint32_t>(*value).value_or(-1), delay_percentage_numerator);
}
}
if (copied_policy != nullptr) fi_policy_ = copied_policy;
}
// Roll the dice
delay_request_ = fi_policy_->delay != Duration::Zero() &&
UnderFraction(fi_policy_->delay_percentage_numerator,
fi_policy_->delay_percentage_denominator);
abort_request_ = fi_policy_->abort_code != GRPC_STATUS_OK &&
UnderFraction(fi_policy_->abort_percentage_numerator,
fi_policy_->abort_percentage_denominator);
if (!delay_request_ && !abort_request_) {
if (copied_policy != nullptr) copied_policy->~FaultInjectionPolicy();
// No fault injection for this call
} else {
fi_policy_owned_ = copied_policy != nullptr;
}
}
bool CallData::HaveActiveFaultsQuota(bool increment) {
if (g_active_faults.load(std::memory_order_acquire) >=
fi_policy_->max_faults) {
return false;
}
if (increment) g_active_faults.fetch_add(1, std::memory_order_relaxed);
return true;
const bool delay_request =
delay != Duration::Zero() &&
UnderFraction(delay_percentage_numerator,
fi_policy->delay_percentage_denominator);
const bool abort_request =
abort_code != GRPC_STATUS_OK &&
UnderFraction(abort_percentage_numerator,
fi_policy->abort_percentage_denominator);
return InjectionDecision(
fi_policy->max_faults, delay_request ? delay : Duration::Zero(),
abort_request ? absl::optional<absl::Status>(absl::Status(
static_cast<absl::StatusCode>(abort_code),
fi_policy->abort_message))
: absl::nullopt);
}
bool CallData::MaybeDelay() {
if (delay_request_) {
return HaveActiveFaultsQuota(true);
}
return false;
bool FaultInjectionFilter::InjectionDecision::HaveActiveFaultsQuota() const {
return g_active_faults.load(std::memory_order_acquire) < max_faults_;
}
grpc_error_handle CallData::MaybeAbort() {
if (abort_request_ && (delay_request_ || HaveActiveFaultsQuota(false))) {
return grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(fi_policy_->abort_message.c_str()),
GRPC_ERROR_INT_GRPC_STATUS, fi_policy_->abort_code);
Timestamp FaultInjectionFilter::InjectionDecision::DelayUntil() {
if (delay_time_ != Duration::Zero() && HaveActiveFaultsQuota()) {
active_fault_ = FaultHandle{true};
return ExecCtx::Get()->Now() + delay_time_;
}
return GRPC_ERROR_NONE;
}
void CallData::DelayBatch(grpc_call_element* elem,
grpc_transport_stream_op_batch* batch) {
MutexLock lock(&delay_mu_);
delayed_batch_ = batch;
resume_batch_canceller_ = new ResumeBatchCanceller(elem);
Timestamp resume_time = ExecCtx::Get()->Now() + fi_policy_->delay;
GRPC_CLOSURE_INIT(&batch->handler_private.closure, ResumeBatch, elem,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&delay_timer_, resume_time, &batch->handler_private.closure);
return Timestamp::InfPast();
}
void CallData::ResumeBatch(void* arg, grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
auto* calld = static_cast<CallData*>(elem->call_data);
MutexLock lock(&calld->delay_mu_);
// Cancelled or canceller has already run
if (error == GRPC_ERROR_CANCELLED ||
calld->resume_batch_canceller_ == nullptr) {
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: Resuming delayed stream op batch %p",
elem->channel_data, calld, calld->delayed_batch_);
}
// Lame the canceller
calld->resume_batch_canceller_ = nullptr;
// Finish fault injection.
calld->FaultInjectionFinished();
// Abort if needed.
error = calld->MaybeAbort();
if (error != GRPC_ERROR_NONE) {
calld->abort_error_ = error;
grpc_transport_stream_op_batch_finish_with_failure(
calld->delayed_batch_, GRPC_ERROR_REF(calld->abort_error_),
calld->call_combiner_);
return;
absl::Status FaultInjectionFilter::InjectionDecision::MaybeAbort() const {
if (abort_request_.has_value() &&
(delay_time_ != Duration::Zero() || HaveActiveFaultsQuota())) {
return abort_request_.value();
}
// Chain to the next filter.
grpc_call_next_op(elem, calld->delayed_batch_);
return absl::OkStatus();
}
void CallData::HijackedRecvTrailingMetadataReady(void* arg,
grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
auto* calld = static_cast<CallData*>(elem->call_data);
if (calld->abort_error_ != GRPC_ERROR_NONE) {
error = grpc_error_add_child(GRPC_ERROR_REF(error),
GRPC_ERROR_REF(calld->abort_error_));
} else {
error = GRPC_ERROR_REF(error);
}
Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
error);
std::string FaultInjectionFilter::InjectionDecision::ToString() const {
return absl::StrCat("delay=", delay_time_ != Duration::Zero(),
" abort=", abort_request_.has_value());
}
} // namespace
extern const grpc_channel_filter FaultInjectionFilterVtable = {
CallData::StartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
sizeof(CallData),
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(ChannelData),
ChannelData::Init,
ChannelData::Destroy,
grpc_channel_next_get_info,
"fault_injection_filter",
};
const grpc_channel_filter FaultInjectionFilter::kFilter =
MakePromiseBasedFilter<FaultInjectionFilter, FilterEndpoint::kClient>(
"fault_injection_filter");
void FaultInjectionFilterRegister(CoreConfiguration::Builder* builder) {
FaultInjectionServiceConfigParser::Register(builder);

@ -21,6 +21,8 @@
#include "src/core/ext/filters/fault_injection/service_config_parser.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/transport/transport.h"
// Channel arg key for enabling parsing fault injection via method config.
#define GRPC_ARG_PARSE_FAULT_INJECTION_METHOD_CONFIG \
@ -32,7 +34,28 @@ namespace grpc_core {
// of the ordinary channel stack. The fault injection filter fetches fault
// injection policy from the method config of service config returned by the
// resolver, and enforces the fault injection policy.
extern const grpc_channel_filter FaultInjectionFilterVtable;
class FaultInjectionFilter : public ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<FaultInjectionFilter> Create(
ChannelArgs args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
explicit FaultInjectionFilter(ChannelFilter::Args filter_args);
class InjectionDecision;
InjectionDecision MakeInjectionDecision(
const ClientMetadataHandle& initial_metadata);
// The relative index of instances of the same filter.
int index_;
const size_t service_config_parser_index_;
};
} // namespace grpc_core

@ -198,7 +198,7 @@ XdsHttpFaultFilter::GenerateFilterConfigOverride(
}
const grpc_channel_filter* XdsHttpFaultFilter::channel_filter() const {
return &FaultInjectionFilterVtable;
return &FaultInjectionFilter::kFilter;
}
grpc_channel_args* XdsHttpFaultFilter::ModifyChannelArgs(

@ -43,15 +43,20 @@ class ChannelFilter {
public:
class Args {
public:
Args() : Args(nullptr) {}
explicit Args(grpc_channel_stack* channel_stack)
: channel_stack_(channel_stack) {}
Args() : Args(nullptr, nullptr) {}
explicit Args(grpc_channel_stack* channel_stack,
grpc_channel_element* channel_element)
: channel_stack_(channel_stack), channel_element_(channel_element) {}
grpc_channel_stack* channel_stack() const { return channel_stack_; }
grpc_channel_element* uninitialized_channel_element() {
return channel_element_;
}
private:
friend class ChannelFilter;
grpc_channel_stack* channel_stack_;
grpc_channel_element* channel_element_;
};
// Construct a promise for one call.
@ -411,7 +416,7 @@ MakePromiseBasedFilter(const char* name) {
[](grpc_channel_element* elem, grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
auto status = F::Create(ChannelArgs::FromC(args->channel_args),
ChannelFilter::Args(args->channel_stack));
ChannelFilter::Args(args->channel_stack, elem));
if (!status.ok()) return absl_status_to_grpc_error(status.status());
new (elem->channel_data) F(std::move(*status));
return GRPC_ERROR_NONE;

Loading…
Cancel
Save