new-ordering

pull/36993/head
Craig Tiller 8 months ago
parent 28ae52c9f1
commit 32bced3922
  1. 7
      src/core/lib/gprpp/unique_type_name.h
  2. 282
      src/core/lib/surface/channel_init.cc
  3. 8
      src/core/lib/surface/channel_init.h

@ -20,6 +20,7 @@
#include <string>
#include "absl/strings/string_view.h"
#include "ref_counted_ptr.h"
#include <grpc/support/port_platform.h>
@ -80,6 +81,12 @@ class UniqueTypeName {
return name_.data() < other.name_.data();
}
template <typename H>
friend H AbslHashValue(H h, const UniqueTypeName& name) {
return H::combine(std::move(h),
static_cast<const void*>(name.name_.data()));
}
int Compare(const UniqueTypeName& other) const {
return QsortCompare(name_.data(), other.name_.data());
}

@ -22,6 +22,7 @@
#include <algorithm>
#include <map>
#include <queue>
#include <set>
#include <string>
#include <type_traits>
@ -39,6 +40,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/surface/channel_stack_type.h"
namespace grpc_core {
@ -112,6 +114,124 @@ ChannelInit::FilterRegistration& ChannelInit::Builder::RegisterFilter(
return *filters_[type].back();
}
class ChannelInit::DependencyTracker {
public:
// Declare that a filter exists.
void Declare(FilterRegistration* registration) {
nodes_.emplace(registration->name_, registration);
}
// Insert an edge from a to b
// Both nodes must be declared.
void InsertEdge(UniqueTypeName a, UniqueTypeName b) {
auto it_a = nodes_.find(a);
auto it_b = nodes_.find(b);
if (it_a == nodes_.end()) {
LOG(ERROR) << "gRPC Filter " << a.name()
<< " was not declared before adding an edge to " << b.name();
return;
}
if (it_b == nodes_.end()) {
LOG(ERROR) << "gRPC Filter " << b.name()
<< " was not declared before adding an edge from " << a.name();
return;
}
auto& node_a = it_a->second;
auto& node_b = it_b->second;
node_a.dependents.push_back(&node_b);
node_b.all_dependencies.push_back(a);
++node_b.waiting_dependencies;
}
// Finish the dependency graph and begin iteration.
void FinishDependencyMap() {
for (auto& p : nodes_) {
if (p.second.waiting_dependencies == 0) {
ready_dependencies_.emplace(&p.second);
}
}
}
FilterRegistration* Next() {
if (ready_dependencies_.empty()) {
CHECK_EQ(nodes_taken_, nodes_.size()) << "Unresolvable graph of channel "
"filters:\n"
<< GraphString();
return nullptr;
}
auto next = ready_dependencies_.top();
ready_dependencies_.pop();
if (next.node->ordering() != Ordering::kDefault) {
// Constraint: if we use ordering other than default, then we must have an
// unambiguous pick. If there is ambiguity, we must fix it by adding
// explicit ordering constraints.
CHECK_NE(next.node->ordering(),
ready_dependencies_.top().node->ordering())
<< "Ambiguous ordering between " << next.node->name() << " and "
<< ready_dependencies_.top().node->name();
}
for (Node* dependent : next.node->dependents) {
CHECK_GT(dependent->waiting_dependencies, 0u);
--dependent->waiting_dependencies;
if (dependent->waiting_dependencies == 0) {
ready_dependencies_.emplace(dependent);
}
}
++nodes_taken_;
return next.node->registration;
}
// Debug helper to dump the graph
std::string GraphString() const {
std::string result;
for (const auto& p : nodes_) {
absl::StrAppend(&result, p.first, " ->");
for (const auto& d : p.second.all_dependencies) {
absl::StrAppend(&result, " ", d);
}
absl::StrAppend(&result, "\n");
}
return result;
}
absl::Span<const UniqueTypeName> DependenciesFor(UniqueTypeName name) const {
auto it = nodes_.find(name);
CHECK(it != nodes_.end()) << "Filter " << name.name() << " not found";
return it->second.all_dependencies;
}
private:
struct Node {
explicit Node(FilterRegistration* registration)
: registration(registration) {}
// Nodes that depend on this node
std::vector<Node*> dependents;
// Nodes that this node depends on - for debugging purposes only
std::vector<UniqueTypeName> all_dependencies;
// The registration for this node
FilterRegistration* registration;
// Number of nodes this node is waiting on
size_t waiting_dependencies = 0;
Ordering ordering() const { return registration->ordering_; }
absl::string_view name() const { return registration->name_.name(); }
};
struct ReadyDependency {
explicit ReadyDependency(Node* node) : node(node) {}
Node* node;
bool operator<(const ReadyDependency& other) const {
// Sort first on ordering, and then lexically on name.
// The lexical sort means that the ordering is stable between builds
// (UniqueTypeName ordering is not stable between builds).
return node->ordering() < other.node->ordering() ||
(node->ordering() == other.node->ordering() &&
node->name() < other.node->name());
}
};
absl::flat_hash_map<UniqueTypeName, Node> nodes_;
std::priority_queue<ReadyDependency> ready_dependencies_;
size_t nodes_taken_ = 0;
};
ChannelInit::StackConfig ChannelInit::BuildStackConfig(
const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
registrations,
@ -122,24 +242,9 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
// ensure algorithm ordering stability is deterministic for a given build.
// We should not require this, but at the time of writing it's expected that
// this will help overall stability.
std::map<UniqueTypeName, FilterRegistration*> filter_to_registration;
using DependencyMap =
std::map<UniqueTypeName,
std::set<UniqueTypeName, CompareChannelFiltersByName>,
CompareChannelFiltersByName>;
DependencyMap dependencies;
DependencyTracker dependencies;
std::vector<Filter> terminal_filters;
for (const auto& registration : registrations) {
if (filter_to_registration.count(registration->name_) > 0) {
const auto first =
filter_to_registration[registration->name_]->registration_source_;
const auto second = registration->registration_source_;
Crash(absl::StrCat("Duplicate registration of channel filter ",
registration->name_, "\nfirst: ", first.file(), ":",
first.line(), "\nsecond: ", second.file(), ":",
second.line()));
}
filter_to_registration[registration->name_] = registration.get();
if (registration->terminal_) {
CHECK(registration->after_.empty());
CHECK(registration->before_.empty());
@ -150,45 +255,22 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
std::move(registration->predicates_), registration->version_,
registration->ordering_, registration->registration_source_);
} else {
dependencies[registration->name_]; // Ensure it's in the map.
dependencies.Declare(registration.get());
}
}
for (const auto& registration : registrations) {
if (registration->terminal_) continue;
CHECK_GT(filter_to_registration.count(registration->name_), 0u);
for (UniqueTypeName after : registration->after_) {
if (filter_to_registration.count(after) == 0) {
gpr_log(
GPR_DEBUG, "%s",
absl::StrCat(
"Filter ", after,
" not registered, but is referenced in the after clause of ",
registration->name_, " when building channel stack ",
grpc_channel_stack_type_string(type))
.c_str());
continue;
}
dependencies[registration->name_].insert(after);
dependencies.InsertEdge(after, registration->name_);
}
for (UniqueTypeName before : registration->before_) {
if (filter_to_registration.count(before) == 0) {
gpr_log(
GPR_DEBUG, "%s",
absl::StrCat(
"Filter ", before,
" not registered, but is referenced in the before clause of ",
registration->name_, " when building channel stack ",
grpc_channel_stack_type_string(type))
.c_str());
continue;
}
dependencies[before].insert(registration->name_);
dependencies.InsertEdge(registration->name_, before);
}
if (registration->before_all_) {
for (const auto& other : registrations) {
if (other.get() == registration.get()) continue;
if (other->terminal_) continue;
dependencies[other->name_].insert(registration->name_);
dependencies.InsertEdge(registration->name_, other->name_);
}
}
}
@ -196,43 +278,13 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
// We can simply iterate through and add anything with no dependency.
// We then remove that filter from the dependency list of all other filters.
// We repeat until we have no more filters to add.
auto build_remaining_dependency_graph =
[](const DependencyMap& dependencies) {
std::string result;
for (const auto& p : dependencies) {
absl::StrAppend(&result, p.first, " ->");
for (const auto& d : p.second) {
absl::StrAppend(&result, " ", d);
}
absl::StrAppend(&result, "\n");
}
return result;
};
const DependencyMap original = dependencies;
auto take_ready_dependency = [&]() {
for (auto it = dependencies.begin(); it != dependencies.end(); ++it) {
if (it->second.empty()) {
auto r = it->first;
dependencies.erase(it);
return r;
}
}
Crash(absl::StrCat(
"Unresolvable graph of channel filters - remaining graph:\n",
build_remaining_dependency_graph(dependencies), "original:\n",
build_remaining_dependency_graph(original)));
};
dependencies.FinishDependencyMap();
std::vector<Filter> filters;
while (!dependencies.empty()) {
auto filter = take_ready_dependency();
auto* registration = filter_to_registration[filter];
while (auto registration = dependencies.Next()) {
filters.emplace_back(
filter, registration->filter_, registration->filter_adder_,
registration->name_, registration->filter_, registration->filter_adder_,
std::move(registration->predicates_), registration->version_,
registration->ordering_, registration->registration_source_);
for (auto& p : dependencies) {
p.second.erase(filter);
}
}
// Collect post processors that need to be applied.
// We've already ensured the one-per-slot constraint, so now we can just
@ -244,6 +296,35 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
}
// Log out the graph we built if that's been requested.
if (GRPC_TRACE_FLAG_ENABLED(channel_stack)) {
PrintChannelStackTrace(type, registrations, dependencies, filters,
terminal_filters);
}
// Check if there are no terminal filters: this would be an error.
// GRPC_CLIENT_DYNAMIC stacks don't use this mechanism, so we don't check that
// condition here.
// Right now we only log: many tests end up with a core configuration that
// is invalid.
// TODO(ctiller): evaluate if we can turn this into a crash one day.
// Right now it forces too many tests to know about channel initialization,
// either by supplying a valid configuration or by including an opt-out flag.
if (terminal_filters.empty() && type != GRPC_CLIENT_DYNAMIC) {
gpr_log(
GPR_ERROR,
"No terminal filters registered for channel stack type %s; this is "
"common for unit tests messing with CoreConfiguration, but will result "
"in a ChannelInit::CreateStack that never completes successfully.",
grpc_channel_stack_type_string(type));
}
return StackConfig{std::move(filters), std::move(terminal_filters),
std::move(post_processor_functions)};
};
void ChannelInit::PrintChannelStackTrace(
grpc_channel_stack_type type,
const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
registrations,
const DependencyTracker& dependencies, const std::vector<Filter>& filters,
const std::vector<Filter>& terminal_filters) {
// It can happen that multiple threads attempt to construct a core config at
// once.
// This is benign - the first one wins and others are discarded.
@ -253,26 +334,30 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
MutexLock lock(m);
// List the channel stack type (since we'll be repeatedly printing graphs in
// this loop).
LOG(INFO) << "ORDERED CHANNEL STACK "
<< grpc_channel_stack_type_string(type) << ":";
LOG(INFO) << "ORDERED CHANNEL STACK " << grpc_channel_stack_type_string(type)
<< ":";
// First build up a map of filter -> file:line: strings, because it helps
// the readability of this log to get later fields aligned vertically.
std::map<UniqueTypeName, std::string> loc_strs;
absl::flat_hash_map<UniqueTypeName, std::string> loc_strs;
size_t max_loc_str_len = 0;
size_t max_filter_name_len = 0;
auto add_loc_str = [&max_loc_str_len, &loc_strs, &filter_to_registration,
auto add_loc_str = [&max_loc_str_len, &loc_strs, &registrations,
&max_filter_name_len](UniqueTypeName name) {
max_filter_name_len = std::max(name.name().length(), max_filter_name_len);
const auto registration =
filter_to_registration[name]->registration_source_;
absl::string_view file = registration.file();
for (const auto& registration : registrations) {
if (registration->name_ == name) {
auto source = registration->registration_source_;
absl::string_view file = source.file();
auto slash_pos = file.rfind('/');
if (slash_pos != file.npos) {
file = file.substr(slash_pos + 1);
}
auto loc_str = absl::StrCat(file, ":", registration.line(), ":");
auto loc_str = absl::StrCat(file, ":", source.line(), ":");
max_loc_str_len = std::max(max_loc_str_len, loc_str.length());
loc_strs.emplace(name, std::move(loc_str));
break;
}
}
};
for (const auto& filter : filters) {
add_loc_str(filter.name);
@ -297,17 +382,13 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
// - If B is registered with .Before({A}), then A will be 'after' B here.
// - If B is registered as BeforeAll, then A will be 'after' B here.
for (const auto& filter : filters) {
auto dep_it = original.find(filter.name);
auto after = dependencies.DependenciesFor(filter.name);
std::string after_str;
if (dep_it != original.end() && !dep_it->second.empty()) {
if (!after.empty()) {
after_str = absl::StrCat(
std::string(max_filter_name_len + 1 - filter.name.name().length(),
' '),
"after ",
absl::StrJoin(dep_it->second, ", ",
[](std::string* out, UniqueTypeName name) {
out->append(std::string(name.name()));
}));
"after ", absl::StrJoin(after, ", "));
} else {
after_str =
std::string(max_filter_name_len - filter.name.name().length(), ' ');
@ -326,25 +407,6 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
LOG(INFO) << filter_str;
}
}
// Check if there are no terminal filters: this would be an error.
// GRPC_CLIENT_DYNAMIC stacks don't use this mechanism, so we don't check that
// condition here.
// Right now we only log: many tests end up with a core configuration that
// is invalid.
// TODO(ctiller): evaluate if we can turn this into a crash one day.
// Right now it forces too many tests to know about channel initialization,
// either by supplying a valid configuration or by including an opt-out flag.
if (terminal_filters.empty() && type != GRPC_CLIENT_DYNAMIC) {
gpr_log(
GPR_ERROR,
"No terminal filters registered for channel stack type %s; this is "
"common for unit tests messing with CoreConfiguration, but will result "
"in a ChannelInit::CreateStack that never completes successfully.",
grpc_channel_stack_type_string(type));
}
return StackConfig{std::move(filters), std::move(terminal_filters),
std::move(post_processor_functions)};
};
ChannelInit ChannelInit::Builder::Build() {
ChannelInit result;

@ -348,6 +348,8 @@ class ChannelInit {
using CreatedType =
typename decltype(T::Create(ChannelArgs(), {}))::value_type;
class DependencyTracker;
struct Filter {
Filter(UniqueTypeName name, const grpc_channel_filter* filter,
FilterAdder filter_adder, std::vector<InclusionPredicate> predicates,
@ -380,6 +382,12 @@ class ChannelInit {
static StackConfig BuildStackConfig(
const std::vector<std::unique_ptr<FilterRegistration>>& registrations,
PostProcessor* post_processors, grpc_channel_stack_type type);
static void PrintChannelStackTrace(
grpc_channel_stack_type type,
const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
registrations,
const DependencyTracker& dependencies, const std::vector<Filter>& filters,
const std::vector<Filter>& terminal_filters);
};
} // namespace grpc_core

Loading…
Cancel
Save