[promises] Expand filter fuzzer to contain a channel stack. (#29513)

* sketch

* Eliminate post-init in channel stack builder

We've had a post init function on channel stack builder for a very long
time, an it serves to run some code after initialization completes.

We need the functionality for a few things, but the function passed in
is intimately tied to the filter in use - we never vary it between
multiple functions for the same filter... which means it makes more
sense to locate this functionality as part of the filter interface.

* fix

* Automated change: Fix sanity tests

* introduce-channel-stack

* introduce-channel-stack

* Automated change: Fix sanity tests

* fix

* fix

* fix

* fixes

* Fix

* fix

* fix

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* review feedback

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/29600/head
Craig Tiller 3 years ago committed by GitHub
parent 35272aafee
commit 92609abed0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 12
      src/core/ext/filters/channel_idle/channel_idle_filter.cc
  3. 3
      src/core/ext/filters/channel_idle/channel_idle_filter.h
  4. 32
      src/core/lib/channel/channel_stack.cc
  5. 4
      src/core/lib/channel/channel_stack.h
  6. 1
      src/core/lib/channel/promise_based_filter.cc
  7. 30
      src/core/lib/channel/promise_based_filter.h
  8. 9
      src/core/lib/promise/activity.h
  9. 166
      test/core/filters/filter_fuzzer.cc
  10. 2
      test/core/filters/filter_fuzzer.proto

@ -2204,6 +2204,7 @@ grpc_cc_library(
"latch",
"memory_quota",
"orphanable",
"poll",
"promise",
"ref_counted",
"ref_counted_ptr",

@ -259,16 +259,12 @@ void ChannelIdleFilter::CloseChannel() {
elem->filter->start_transport_op(elem, op);
}
namespace {
const grpc_channel_filter grpc_client_idle_filter =
const grpc_channel_filter ClientIdleFilter::kFilter =
MakePromiseBasedFilter<ClientIdleFilter, FilterEndpoint::kClient>(
"client_idle");
const grpc_channel_filter grpc_max_age_filter =
const grpc_channel_filter MaxAgeFilter::kFilter =
MakePromiseBasedFilter<MaxAgeFilter, FilterEndpoint::kServer>("max_age");
} // namespace
void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
@ -276,7 +272,7 @@ void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
auto channel_args = builder->channel_args();
if (!channel_args.WantMinimalStack() &&
GetClientIdleTimeout(channel_args) != Duration::Infinity()) {
builder->PrependFilter(&grpc_client_idle_filter);
builder->PrependFilter(&ClientIdleFilter::kFilter);
}
return true;
});
@ -286,7 +282,7 @@ void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
auto channel_args = builder->channel_args();
if (!channel_args.WantMinimalStack() &&
MaxAgeFilter::Config::FromChannelArgs(channel_args).enable()) {
builder->PrependFilter(&grpc_max_age_filter);
builder->PrependFilter(&MaxAgeFilter::kFilter);
}
return true;
});

@ -75,6 +75,8 @@ class ChannelIdleFilter : public ChannelFilter {
class ClientIdleFilter final : public ChannelIdleFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ClientIdleFilter> Create(
ChannelArgs args, ChannelFilter::Args filter_args);
@ -84,6 +86,7 @@ class ClientIdleFilter final : public ChannelIdleFilter {
class MaxAgeFilter final : public ChannelIdleFilter {
public:
static const grpc_channel_filter kFilter;
struct Config;
static absl::StatusOr<MaxAgeFilter> Create(ChannelArgs args,

@ -22,9 +22,12 @@
#include <stdint.h>
#include <utility>
#include <grpc/support/log.h>
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/promise/poll.h"
grpc_core::TraceFlag grpc_trace_channel(false, "channel");
grpc_core::TraceFlag grpc_trace_channel_stack(false, "channel_stack");
@ -278,3 +281,32 @@ grpc_call_stack* grpc_call_stack_from_top_element(grpc_call_element* elem) {
void grpc_channel_stack_no_post_init(grpc_channel_stack*,
grpc_channel_element*) {}
namespace {
grpc_core::NextPromiseFactory ClientNext(grpc_channel_element* elem) {
return [elem](grpc_core::CallArgs args) {
return elem->filter->make_call_promise(elem, std::move(args),
ClientNext(elem + 1));
};
}
grpc_core::NextPromiseFactory ServerNext(grpc_channel_element* elem) {
return [elem](grpc_core::CallArgs args) {
return elem->filter->make_call_promise(elem, std::move(args),
ServerNext(elem - 1));
};
}
} // namespace
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
grpc_channel_stack::MakeCallPromise(grpc_core::CallArgs call_args) {
if (is_client) {
return ClientNext(grpc_channel_stack_element(this, 0))(
std::move(call_args));
} else {
return ServerNext(grpc_channel_stack_element(this, this->count - 1))(
std::move(call_args));
}
}

@ -214,6 +214,7 @@ struct grpc_call_element {
guarantees they live within a single malloc() allocation */
struct grpc_channel_stack {
grpc_stream_refcount refcount;
bool is_client;
size_t count;
/* Memory required for a call stack (computed at channel stack
initialization) */
@ -235,6 +236,9 @@ struct grpc_channel_stack {
IncrementRefCount();
return grpc_core::RefCountedPtr<grpc_channel_stack>(this);
}
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
grpc_core::CallArgs call_args);
};
/* A call stack tracks a set of related filters for one call, and guarantees

@ -16,7 +16,6 @@
#include "src/core/lib/channel/promise_based_filter.h"
#include <cstdlib>
#include <memory>
#include <string>

@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <stdlib.h>
#include <atomic>
#include <new>
@ -109,6 +110,16 @@ static constexpr uint8_t kFilterExaminesServerInitialMetadata = 1;
namespace promise_filter_detail {
// Proxy channel filter for initialization failure, since we must leave a valid
// filter in place.
class InvalidChannelFilter : public ChannelFilter {
public:
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs, NextPromiseFactory) override {
abort();
}
};
// Call data shared between all implementations of promise-based filters.
class BaseCallData : public Activity, private Wakeable {
public:
@ -463,13 +474,14 @@ MakePromiseBasedFilter(const char* name) {
// make_call_promise
[](grpc_channel_element* elem, CallArgs call_args,
NextPromiseFactory next_promise_factory) {
return static_cast<F*>(elem->channel_data)
return static_cast<ChannelFilter*>(elem->channel_data)
->MakeCallPromise(std::move(call_args),
std::move(next_promise_factory));
},
// start_transport_op
[](grpc_channel_element* elem, grpc_transport_op* op) {
if (!static_cast<F*>(elem->channel_data)->StartTransportOp(op)) {
if (!static_cast<ChannelFilter*>(elem->channel_data)
->StartTransportOp(op)) {
grpc_channel_next_op(elem, op);
}
},
@ -495,20 +507,26 @@ MakePromiseBasedFilter(const char* name) {
sizeof(F),
// init_channel_elem
[](grpc_channel_element* elem, grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
auto status = F::Create(ChannelArgs::FromC(args->channel_args),
ChannelFilter::Args(args->channel_stack, elem));
if (!status.ok()) return absl_status_to_grpc_error(status.status());
if (!status.ok()) {
static_assert(
sizeof(promise_filter_detail::InvalidChannelFilter) <= sizeof(F),
"InvalidChannelFilter must fit in F");
new (elem->channel_data)
promise_filter_detail::InvalidChannelFilter();
return absl_status_to_grpc_error(status.status());
}
new (elem->channel_data) F(std::move(*status));
return GRPC_ERROR_NONE;
},
// post_init_channel_elem
[](grpc_channel_stack*, grpc_channel_element* elem) {
static_cast<F*>(elem->channel_data)->PostInit();
static_cast<ChannelFilter*>(elem->channel_data)->PostInit();
},
// destroy_channel_elem
[](grpc_channel_element* elem) {
static_cast<F*>(elem->channel_data)->~F();
static_cast<ChannelFilter*>(elem->channel_data)->~ChannelFilter();
},
// get_channel_info
grpc_channel_next_get_info,

@ -150,13 +150,16 @@ class Activity : public Orphanable {
// Set the current activity at construction, clean it up at destruction.
class ScopedActivity {
public:
explicit ScopedActivity(Activity* activity) {
GPR_ASSERT(g_current_activity_ == nullptr);
explicit ScopedActivity(Activity* activity)
: prior_activity_(g_current_activity_) {
g_current_activity_ = activity;
}
~ScopedActivity() { g_current_activity_ = nullptr; }
~ScopedActivity() { g_current_activity_ = prior_activity_; }
ScopedActivity(const ScopedActivity&) = delete;
ScopedActivity& operator=(const ScopedActivity&) = delete;
private:
Activity* const prior_activity_;
};
private:

@ -20,6 +20,7 @@
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/ext/filters/http/client_authority_filter.h"
#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer_manager.h"
@ -164,24 +165,6 @@ struct GlobalObjects {
}
};
struct Filter {
absl::string_view name;
absl::StatusOr<std::unique_ptr<ChannelFilter>> (*create)(
ChannelArgs channel_args, ChannelFilter::Args filter_args);
template <typename T>
static Filter* Make(absl::string_view name) {
return new Filter{
name,
[](ChannelArgs channel_args, ChannelFilter::Args filter_args)
-> absl::StatusOr<std::unique_ptr<ChannelFilter>> {
auto r = T::Create(channel_args, filter_args);
if (!r.ok()) return r.status();
return std::unique_ptr<ChannelFilter>(new T(std::move(*r)));
}};
}
};
RefCountedPtr<AuthorizationEngine> LoadAuthorizationEngine(
const filter_fuzzer::AuthorizationEngine& engine) {
switch (engine.engine_case()) {
@ -256,42 +239,36 @@ ChannelArgs LoadChannelArgs(const FuzzerChannelArgs& fuzz_args,
return args;
}
#define MAKE_FILTER(name) Filter::Make<name>(#name)
const Filter* const kFilters[] = {
MAKE_FILTER(ClientAuthorityFilter), MAKE_FILTER(HttpClientFilter),
MAKE_FILTER(ClientAuthFilter), MAKE_FILTER(GrpcServerAuthzFilter),
MAKE_FILTER(HttpServerFilter),
const grpc_channel_filter* const kFilters[] = {
&ClientAuthorityFilter::kFilter, &HttpClientFilter::kFilter,
&ClientAuthFilter::kFilter, &GrpcServerAuthzFilter::kFilterVtable,
&MaxAgeFilter::kFilter, &ClientIdleFilter::kFilter,
&HttpServerFilter::kFilter,
// We exclude this one internally, so we can't have it here - will need to
// pick it up through some future registration mechanism.
// MAKE_FILTER(ServerLoadReportingFilter),
// The following need channel stacks, and that's not figured out yet
// MAKE_FILTER(MaxAgeFilter),
// MAKE_FILTER(ClientIdleFilter),
};
absl::StatusOr<std::unique_ptr<ChannelFilter>> CreateFilter(
absl::string_view name, ChannelArgs channel_args,
ChannelFilter::Args filter_args) {
const grpc_channel_filter* FindFilter(absl::string_view name) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(kFilters); ++i) {
if (name == kFilters[i]->name) {
return kFilters[i]->create(std::move(channel_args), filter_args);
}
if (name == kFilters[i]->name) return kFilters[i];
}
return absl::NotFoundError(absl::StrCat("Filter ", name, " not found"));
return nullptr;
}
class MainLoop {
public:
MainLoop(std::unique_ptr<ChannelFilter> filter, ChannelArgs channel_args)
MainLoop(RefCountedPtr<grpc_channel_stack> channel_stack,
ChannelArgs channel_args)
: memory_allocator_(channel_args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryAllocator("test")),
filter_(std::move(filter)) {}
channel_stack_(std::move(channel_stack)) {}
~MainLoop() {
ExecCtx exec_ctx;
calls_.clear();
channel_stack_.reset();
}
void Run(const filter_fuzzer::Action& action, GlobalObjects* globals) {
@ -335,6 +312,26 @@ class MainLoop {
}
}
static const grpc_channel_filter* EndFilter(bool is_client) {
static const grpc_channel_filter client_filter =
MakePromiseBasedFilter<Call::EndFilter, FilterEndpoint::kClient>(
"client-end");
static const grpc_channel_filter server_filter =
MakePromiseBasedFilter<Call::EndFilter, FilterEndpoint::kServer>(
"server-end");
return is_client ? &client_filter : &server_filter;
}
static const grpc_channel_filter* BottomFilter(bool is_client) {
static const grpc_channel_filter client_filter =
MakePromiseBasedFilter<Call::BottomFilter, FilterEndpoint::kClient>(
"client-end");
static const grpc_channel_filter server_filter =
MakePromiseBasedFilter<Call::BottomFilter, FilterEndpoint::kServer>(
"server-end");
return is_client ? &client_filter : &server_filter;
}
private:
class WakeCall final : public Wakeable {
public:
@ -356,27 +353,62 @@ class MainLoop {
class Call final : public Activity {
public:
// EndFilter is the last filter that will be invoked for a call
class EndFilter : public ChannelFilter {
public:
static absl::StatusOr<EndFilter> Create(ChannelArgs,
ChannelFilter::Args) {
return EndFilter{};
}
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory) override {
Call* call = static_cast<Call*>(Activity::current());
if (call->server_initial_metadata_) {
call_args.server_initial_metadata->Set(
call->server_initial_metadata_.get());
} else {
call->unset_incoming_server_initial_metadata_latch_ =
call_args.server_initial_metadata;
}
return [call]() -> Poll<ServerMetadataHandle> {
return call->CheckCompletion();
};
}
};
// BottomFilter is the last filter on a channel stack (for sinking ops)
class BottomFilter : public ChannelFilter {
public:
static absl::StatusOr<BottomFilter> Create(ChannelArgs,
ChannelFilter::Args) {
return BottomFilter{};
}
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next) override {
return next(std::move(call_args));
}
bool StartTransportOp(grpc_transport_op* op) override {
GRPC_ERROR_UNREF(op->disconnect_with_error);
GRPC_ERROR_UNREF(op->goaway_error);
ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
return true;
}
};
Call(MainLoop* main_loop, uint32_t id,
const filter_fuzzer::Metadata& client_initial_metadata)
: main_loop_(main_loop), id_(id) {
ScopedContext context(this);
auto* server_initial_metadata = arena_->New<Latch<ServerMetadata*>>();
promise_ = main_loop_->filter_->MakeCallPromise(
promise_ = main_loop_->channel_stack_->MakeCallPromise(
CallArgs{std::move(*LoadMetadata(client_initial_metadata,
&client_initial_metadata_)),
server_initial_metadata},
[this](CallArgs call_args) -> ArenaPromise<ServerMetadataHandle> {
if (server_initial_metadata_) {
call_args.server_initial_metadata->Set(
server_initial_metadata_.get());
} else {
unset_incoming_server_initial_metadata_latch_ =
call_args.server_initial_metadata;
}
return [this]() -> Poll<ServerMetadataHandle> {
return CheckCompletion();
};
});
server_initial_metadata});
Step();
}
@ -535,7 +567,7 @@ class MainLoop {
}
MemoryAllocator memory_allocator_;
std::unique_ptr<ChannelFilter> filter_;
RefCountedPtr<grpc_channel_stack> channel_stack_;
std::map<uint32_t, std::unique_ptr<Call>> calls_;
std::vector<uint32_t> wakeups_;
};
@ -544,6 +576,13 @@ class MainLoop {
} // namespace grpc_core
DEFINE_PROTO_FUZZER(const filter_fuzzer::Msg& msg) {
const grpc_channel_filter* filter = grpc_core::FindFilter(msg.filter());
if (filter == nullptr) return;
if (msg.channel_stack_type() < 0 ||
msg.channel_stack_type() >= GRPC_NUM_CHANNEL_STACK_TYPES) {
return;
}
grpc_test_only_set_slice_hash_seed(0);
char* grpc_trace_fuzzer = gpr_getenv("GRPC_TRACE_FUZZER");
if (squelch && grpc_trace_fuzzer == nullptr) gpr_set_log_function(dont_log);
@ -560,10 +599,27 @@ DEFINE_PROTO_FUZZER(const filter_fuzzer::Msg& msg) {
grpc_core::GlobalObjects globals;
auto channel_args = grpc_core::LoadChannelArgs(msg.channel_args(), &globals);
auto filter = grpc_core::CreateFilter(msg.filter(), channel_args,
grpc_core::ChannelFilter::Args());
if (filter.ok()) {
grpc_core::MainLoop main_loop(std::move(*filter), channel_args);
grpc_core::ChannelStackBuilderImpl builder(
msg.stack_name().c_str(),
static_cast<grpc_channel_stack_type>(msg.channel_stack_type()));
builder.SetChannelArgs(channel_args);
builder.AppendFilter(filter);
const bool is_client =
grpc_channel_stack_type_is_client(builder.channel_stack_type());
if (is_client) {
builder.AppendFilter(grpc_core::MainLoop::EndFilter(true));
} else {
builder.PrependFilter(grpc_core::MainLoop::EndFilter(false));
}
builder.AppendFilter(grpc_core::MainLoop::BottomFilter(is_client));
auto stack = [&]() {
grpc_core::ExecCtx exec_ctx;
return builder.Build();
}();
if (stack.ok()) {
grpc_core::MainLoop main_loop(std::move(*stack), std::move(channel_args));
for (const auto& action : msg.actions()) {
grpc_timer_manager_tick();
main_loop.Run(action, &globals);

@ -96,6 +96,8 @@ message Action {
message Msg {
string filter = 1;
int32 channel_stack_type = 3;
string stack_name = 4;
repeated ChannelArg channel_args = 2;
repeated Action actions = 100;
}

Loading…
Cancel
Save