pull/35420/head
Craig Tiller 11 months ago
parent 2c18d16475
commit 1a7cb7aef3
  1. 30
      src/core/ext/filters/fault_injection/fault_injection_filter.cc
  2. 17
      src/core/ext/filters/fault_injection/fault_injection_filter.h

@ -54,6 +54,11 @@
namespace grpc_core {
TraceFlag grpc_fault_injection_filter_trace(false, "fault_injection_filter");
const NoInterceptor FaultInjectionFilter::Call::OnServerInitialMetadata;
const NoInterceptor FaultInjectionFilter::Call::OnServerTrailingMetadata;
const NoInterceptor FaultInjectionFilter::Call::OnClientToServerMessage;
const NoInterceptor FaultInjectionFilter::Call::OnServerToClientMessage;
const NoInterceptor FaultInjectionFilter::Call::OnFinalize;
namespace {
@ -144,23 +149,22 @@ FaultInjectionFilter::FaultInjectionFilter(ChannelFilter::Args filter_args)
mu_(new Mutex) {}
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> FaultInjectionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto decision = MakeInjectionDecision(call_args.client_initial_metadata);
ArenaPromise<absl::Status> FaultInjectionFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, FaultInjectionFilter* filter) {
auto decision = filter->MakeInjectionDecision(md);
if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) {
gpr_log(GPR_INFO, "chand=%p: Fault injection triggered %s", this,
decision.ToString().c_str());
}
auto delay = decision.DelayUntil();
return TrySeq(
Sleep(delay),
[decision = std::move(decision)]() { return decision.MaybeAbort(); },
next_promise_factory(std::move(call_args)));
return TrySeq(Sleep(delay), [decision = std::move(decision)]() {
return decision.MaybeAbort();
});
}
FaultInjectionFilter::InjectionDecision
FaultInjectionFilter::MakeInjectionDecision(
const ClientMetadataHandle& initial_metadata) {
const ClientMetadata& initial_metadata) {
// Fetch the fault injection policy from the service config, based on the
// relative index for which policy should this CallData use.
auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
@ -188,15 +192,15 @@ FaultInjectionFilter::MakeInjectionDecision(
!fi_policy->delay_percentage_header.empty()) {
std::string buffer;
if (!fi_policy->abort_code_header.empty() && abort_code == GRPC_STATUS_OK) {
auto value = initial_metadata->GetStringValue(
fi_policy->abort_code_header, &buffer);
auto value = initial_metadata.GetStringValue(fi_policy->abort_code_header,
&buffer);
if (value.has_value()) {
grpc_status_code_from_int(
AsInt<int>(*value).value_or(GRPC_STATUS_UNKNOWN), &abort_code);
}
}
if (!fi_policy->abort_percentage_header.empty()) {
auto value = initial_metadata->GetStringValue(
auto value = initial_metadata.GetStringValue(
fi_policy->abort_percentage_header, &buffer);
if (value.has_value()) {
abort_percentage_numerator = std::min(
@ -205,14 +209,14 @@ FaultInjectionFilter::MakeInjectionDecision(
}
if (!fi_policy->delay_header.empty() && delay == Duration::Zero()) {
auto value =
initial_metadata->GetStringValue(fi_policy->delay_header, &buffer);
initial_metadata.GetStringValue(fi_policy->delay_header, &buffer);
if (value.has_value()) {
delay = Duration::Milliseconds(
std::max(AsInt<int64_t>(*value).value_or(0), int64_t{0}));
}
}
if (!fi_policy->delay_percentage_header.empty()) {
auto value = initial_metadata->GetStringValue(
auto value = initial_metadata.GetStringValue(
fi_policy->delay_percentage_header, &buffer);
if (value.has_value()) {
delay_percentage_numerator = std::min(

@ -40,7 +40,8 @@ namespace grpc_core {
// of the ordinary channel stack. The fault injection filter fetches fault
// injection policy from the method config of service config returned by the
// resolver, and enforces the fault injection policy.
class FaultInjectionFilter : public ChannelFilter {
class FaultInjectionFilter
: public ImplementChannelFilter<FaultInjectionFilter> {
public:
static const grpc_channel_filter kFilter;
@ -48,15 +49,23 @@ class FaultInjectionFilter : public ChannelFilter {
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
ArenaPromise<absl::Status> OnClientInitialMetadata(
ClientMetadata& md, FaultInjectionFilter* filter);
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};
private:
explicit FaultInjectionFilter(ChannelFilter::Args filter_args);
class InjectionDecision;
InjectionDecision MakeInjectionDecision(
const ClientMetadataHandle& initial_metadata);
const ClientMetadata& initial_metadata);
// The relative index of instances of the same filter.
size_t index_;

Loading…
Cancel
Save