diff --git a/src/core/lib/gprpp/unique_type_name.h b/src/core/lib/gprpp/unique_type_name.h index 5e3f74320c0..be158426d8c 100644 --- a/src/core/lib/gprpp/unique_type_name.h +++ b/src/core/lib/gprpp/unique_type_name.h @@ -20,6 +20,7 @@ #include #include "absl/strings/string_view.h" +#include "ref_counted_ptr.h" #include @@ -80,6 +81,12 @@ class UniqueTypeName { return name_.data() < other.name_.data(); } + template + friend H AbslHashValue(H h, const UniqueTypeName& name) { + return H::combine(std::move(h), + static_cast(name.name_.data())); + } + int Compare(const UniqueTypeName& other) const { return QsortCompare(name_.data(), other.name_.data()); } diff --git a/src/core/lib/surface/channel_init.cc b/src/core/lib/surface/channel_init.cc index b22c19fc081..cde1a289544 100644 --- a/src/core/lib/surface/channel_init.cc +++ b/src/core/lib/surface/channel_init.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -39,6 +40,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/surface/channel_stack_type.h" namespace grpc_core { @@ -112,6 +114,124 @@ ChannelInit::FilterRegistration& ChannelInit::Builder::RegisterFilter( return *filters_[type].back(); } +class ChannelInit::DependencyTracker { + public: + // Declare that a filter exists. + void Declare(FilterRegistration* registration) { + nodes_.emplace(registration->name_, registration); + } + // Insert an edge from a to b + // Both nodes must be declared. + void InsertEdge(UniqueTypeName a, UniqueTypeName b) { + auto it_a = nodes_.find(a); + auto it_b = nodes_.find(b); + if (it_a == nodes_.end()) { + LOG(ERROR) << "gRPC Filter " << a.name() + << " was not declared before adding an edge to " << b.name(); + return; + } + if (it_b == nodes_.end()) { + LOG(ERROR) << "gRPC Filter " << b.name() + << " was not declared before adding an edge from " << a.name(); + return; + } + auto& node_a = it_a->second; + auto& node_b = it_b->second; + node_a.dependents.push_back(&node_b); + node_b.all_dependencies.push_back(a); + ++node_b.waiting_dependencies; + } + + // Finish the dependency graph and begin iteration. + void FinishDependencyMap() { + for (auto& p : nodes_) { + if (p.second.waiting_dependencies == 0) { + ready_dependencies_.emplace(&p.second); + } + } + } + + FilterRegistration* Next() { + if (ready_dependencies_.empty()) { + CHECK_EQ(nodes_taken_, nodes_.size()) << "Unresolvable graph of channel " + "filters:\n" + << GraphString(); + return nullptr; + } + auto next = ready_dependencies_.top(); + ready_dependencies_.pop(); + if (next.node->ordering() != Ordering::kDefault) { + // Constraint: if we use ordering other than default, then we must have an + // unambiguous pick. If there is ambiguity, we must fix it by adding + // explicit ordering constraints. + CHECK_NE(next.node->ordering(), + ready_dependencies_.top().node->ordering()) + << "Ambiguous ordering between " << next.node->name() << " and " + << ready_dependencies_.top().node->name(); + } + for (Node* dependent : next.node->dependents) { + CHECK_GT(dependent->waiting_dependencies, 0u); + --dependent->waiting_dependencies; + if (dependent->waiting_dependencies == 0) { + ready_dependencies_.emplace(dependent); + } + } + ++nodes_taken_; + return next.node->registration; + } + + // Debug helper to dump the graph + std::string GraphString() const { + std::string result; + for (const auto& p : nodes_) { + absl::StrAppend(&result, p.first, " ->"); + for (const auto& d : p.second.all_dependencies) { + absl::StrAppend(&result, " ", d); + } + absl::StrAppend(&result, "\n"); + } + return result; + } + + absl::Span DependenciesFor(UniqueTypeName name) const { + auto it = nodes_.find(name); + CHECK(it != nodes_.end()) << "Filter " << name.name() << " not found"; + return it->second.all_dependencies; + } + + private: + struct Node { + explicit Node(FilterRegistration* registration) + : registration(registration) {} + // Nodes that depend on this node + std::vector dependents; + // Nodes that this node depends on - for debugging purposes only + std::vector all_dependencies; + // The registration for this node + FilterRegistration* registration; + // Number of nodes this node is waiting on + size_t waiting_dependencies = 0; + + Ordering ordering() const { return registration->ordering_; } + absl::string_view name() const { return registration->name_.name(); } + }; + struct ReadyDependency { + explicit ReadyDependency(Node* node) : node(node) {} + Node* node; + bool operator<(const ReadyDependency& other) const { + // Sort first on ordering, and then lexically on name. + // The lexical sort means that the ordering is stable between builds + // (UniqueTypeName ordering is not stable between builds). + return node->ordering() < other.node->ordering() || + (node->ordering() == other.node->ordering() && + node->name() < other.node->name()); + } + }; + absl::flat_hash_map nodes_; + std::priority_queue ready_dependencies_; + size_t nodes_taken_ = 0; +}; + ChannelInit::StackConfig ChannelInit::BuildStackConfig( const std::vector>& registrations, @@ -122,24 +242,9 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig( // ensure algorithm ordering stability is deterministic for a given build. // We should not require this, but at the time of writing it's expected that // this will help overall stability. - std::map filter_to_registration; - using DependencyMap = - std::map, - CompareChannelFiltersByName>; - DependencyMap dependencies; + DependencyTracker dependencies; std::vector terminal_filters; for (const auto& registration : registrations) { - if (filter_to_registration.count(registration->name_) > 0) { - const auto first = - filter_to_registration[registration->name_]->registration_source_; - const auto second = registration->registration_source_; - Crash(absl::StrCat("Duplicate registration of channel filter ", - registration->name_, "\nfirst: ", first.file(), ":", - first.line(), "\nsecond: ", second.file(), ":", - second.line())); - } - filter_to_registration[registration->name_] = registration.get(); if (registration->terminal_) { CHECK(registration->after_.empty()); CHECK(registration->before_.empty()); @@ -150,45 +255,22 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig( std::move(registration->predicates_), registration->version_, registration->ordering_, registration->registration_source_); } else { - dependencies[registration->name_]; // Ensure it's in the map. + dependencies.Declare(registration.get()); } } for (const auto& registration : registrations) { if (registration->terminal_) continue; - CHECK_GT(filter_to_registration.count(registration->name_), 0u); for (UniqueTypeName after : registration->after_) { - if (filter_to_registration.count(after) == 0) { - gpr_log( - GPR_DEBUG, "%s", - absl::StrCat( - "Filter ", after, - " not registered, but is referenced in the after clause of ", - registration->name_, " when building channel stack ", - grpc_channel_stack_type_string(type)) - .c_str()); - continue; - } - dependencies[registration->name_].insert(after); + dependencies.InsertEdge(after, registration->name_); } for (UniqueTypeName before : registration->before_) { - if (filter_to_registration.count(before) == 0) { - gpr_log( - GPR_DEBUG, "%s", - absl::StrCat( - "Filter ", before, - " not registered, but is referenced in the before clause of ", - registration->name_, " when building channel stack ", - grpc_channel_stack_type_string(type)) - .c_str()); - continue; - } - dependencies[before].insert(registration->name_); + dependencies.InsertEdge(registration->name_, before); } if (registration->before_all_) { for (const auto& other : registrations) { if (other.get() == registration.get()) continue; if (other->terminal_) continue; - dependencies[other->name_].insert(registration->name_); + dependencies.InsertEdge(registration->name_, other->name_); } } } @@ -196,43 +278,13 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig( // We can simply iterate through and add anything with no dependency. // We then remove that filter from the dependency list of all other filters. // We repeat until we have no more filters to add. - auto build_remaining_dependency_graph = - [](const DependencyMap& dependencies) { - std::string result; - for (const auto& p : dependencies) { - absl::StrAppend(&result, p.first, " ->"); - for (const auto& d : p.second) { - absl::StrAppend(&result, " ", d); - } - absl::StrAppend(&result, "\n"); - } - return result; - }; - const DependencyMap original = dependencies; - auto take_ready_dependency = [&]() { - for (auto it = dependencies.begin(); it != dependencies.end(); ++it) { - if (it->second.empty()) { - auto r = it->first; - dependencies.erase(it); - return r; - } - } - Crash(absl::StrCat( - "Unresolvable graph of channel filters - remaining graph:\n", - build_remaining_dependency_graph(dependencies), "original:\n", - build_remaining_dependency_graph(original))); - }; + dependencies.FinishDependencyMap(); std::vector filters; - while (!dependencies.empty()) { - auto filter = take_ready_dependency(); - auto* registration = filter_to_registration[filter]; + while (auto registration = dependencies.Next()) { filters.emplace_back( - filter, registration->filter_, registration->filter_adder_, + registration->name_, registration->filter_, registration->filter_adder_, std::move(registration->predicates_), registration->version_, registration->ordering_, registration->registration_source_); - for (auto& p : dependencies) { - p.second.erase(filter); - } } // Collect post processors that need to be applied. // We've already ensured the one-per-slot constraint, so now we can just @@ -244,87 +296,8 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig( } // Log out the graph we built if that's been requested. if (GRPC_TRACE_FLAG_ENABLED(channel_stack)) { - // It can happen that multiple threads attempt to construct a core config at - // once. - // This is benign - the first one wins and others are discarded. - // However, it messes up our logging and makes it harder to reason about the - // graph, so we add some protection here. - static Mutex* const m = new Mutex(); - MutexLock lock(m); - // List the channel stack type (since we'll be repeatedly printing graphs in - // this loop). - LOG(INFO) << "ORDERED CHANNEL STACK " - << grpc_channel_stack_type_string(type) << ":"; - // First build up a map of filter -> file:line: strings, because it helps - // the readability of this log to get later fields aligned vertically. - std::map loc_strs; - size_t max_loc_str_len = 0; - size_t max_filter_name_len = 0; - auto add_loc_str = [&max_loc_str_len, &loc_strs, &filter_to_registration, - &max_filter_name_len](UniqueTypeName name) { - max_filter_name_len = std::max(name.name().length(), max_filter_name_len); - const auto registration = - filter_to_registration[name]->registration_source_; - absl::string_view file = registration.file(); - auto slash_pos = file.rfind('/'); - if (slash_pos != file.npos) { - file = file.substr(slash_pos + 1); - } - auto loc_str = absl::StrCat(file, ":", registration.line(), ":"); - max_loc_str_len = std::max(max_loc_str_len, loc_str.length()); - loc_strs.emplace(name, std::move(loc_str)); - }; - for (const auto& filter : filters) { - add_loc_str(filter.name); - } - for (const auto& terminal : terminal_filters) { - add_loc_str(terminal.name); - } - for (auto& loc_str : loc_strs) { - loc_str.second = absl::StrCat( - loc_str.second, - std::string(max_loc_str_len + 2 - loc_str.second.length(), ' ')); - } - // For each regular filter, print the location registered, the name of the - // filter, and if it needed to occur after some other filters list those - // filters too. - // Note that we use the processed after list here - earlier we turned Before - // registrations into After registrations and we used those converted - // registrations to build the final ordering. - // If you're trying to track down why 'A' is listed as after 'B', look at - // the following: - // - If A is registered with .After({B}), then A will be 'after' B here. - // - If B is registered with .Before({A}), then A will be 'after' B here. - // - If B is registered as BeforeAll, then A will be 'after' B here. - for (const auto& filter : filters) { - auto dep_it = original.find(filter.name); - std::string after_str; - if (dep_it != original.end() && !dep_it->second.empty()) { - after_str = absl::StrCat( - std::string(max_filter_name_len + 1 - filter.name.name().length(), - ' '), - "after ", - absl::StrJoin(dep_it->second, ", ", - [](std::string* out, UniqueTypeName name) { - out->append(std::string(name.name())); - })); - } else { - after_str = - std::string(max_filter_name_len - filter.name.name().length(), ' '); - } - LOG(INFO) << " " << loc_strs[filter.name] << filter.name << after_str - << " [" << filter.ordering << "/" << filter.version << "]"; - } - // Finally list out the terminal filters and where they were registered - // from. - for (const auto& terminal : terminal_filters) { - const auto filter_str = absl::StrCat( - " ", loc_strs[terminal.name], terminal.name, - std::string(max_filter_name_len + 1 - terminal.name.name().length(), - ' '), - "[terminal]"); - LOG(INFO) << filter_str; - } + PrintChannelStackTrace(type, registrations, dependencies, filters, + terminal_filters); } // Check if there are no terminal filters: this would be an error. // GRPC_CLIENT_DYNAMIC stacks don't use this mechanism, so we don't check that @@ -346,6 +319,95 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig( std::move(post_processor_functions)}; }; +void ChannelInit::PrintChannelStackTrace( + grpc_channel_stack_type type, + const std::vector>& + registrations, + const DependencyTracker& dependencies, const std::vector& filters, + const std::vector& terminal_filters) { + // It can happen that multiple threads attempt to construct a core config at + // once. + // This is benign - the first one wins and others are discarded. + // However, it messes up our logging and makes it harder to reason about the + // graph, so we add some protection here. + static Mutex* const m = new Mutex(); + MutexLock lock(m); + // List the channel stack type (since we'll be repeatedly printing graphs in + // this loop). + LOG(INFO) << "ORDERED CHANNEL STACK " << grpc_channel_stack_type_string(type) + << ":"; + // First build up a map of filter -> file:line: strings, because it helps + // the readability of this log to get later fields aligned vertically. + absl::flat_hash_map loc_strs; + size_t max_loc_str_len = 0; + size_t max_filter_name_len = 0; + auto add_loc_str = [&max_loc_str_len, &loc_strs, ®istrations, + &max_filter_name_len](UniqueTypeName name) { + max_filter_name_len = std::max(name.name().length(), max_filter_name_len); + for (const auto& registration : registrations) { + if (registration->name_ == name) { + auto source = registration->registration_source_; + absl::string_view file = source.file(); + auto slash_pos = file.rfind('/'); + if (slash_pos != file.npos) { + file = file.substr(slash_pos + 1); + } + auto loc_str = absl::StrCat(file, ":", source.line(), ":"); + max_loc_str_len = std::max(max_loc_str_len, loc_str.length()); + loc_strs.emplace(name, std::move(loc_str)); + break; + } + } + }; + for (const auto& filter : filters) { + add_loc_str(filter.name); + } + for (const auto& terminal : terminal_filters) { + add_loc_str(terminal.name); + } + for (auto& loc_str : loc_strs) { + loc_str.second = absl::StrCat( + loc_str.second, + std::string(max_loc_str_len + 2 - loc_str.second.length(), ' ')); + } + // For each regular filter, print the location registered, the name of the + // filter, and if it needed to occur after some other filters list those + // filters too. + // Note that we use the processed after list here - earlier we turned Before + // registrations into After registrations and we used those converted + // registrations to build the final ordering. + // If you're trying to track down why 'A' is listed as after 'B', look at + // the following: + // - If A is registered with .After({B}), then A will be 'after' B here. + // - If B is registered with .Before({A}), then A will be 'after' B here. + // - If B is registered as BeforeAll, then A will be 'after' B here. + for (const auto& filter : filters) { + auto after = dependencies.DependenciesFor(filter.name); + std::string after_str; + if (!after.empty()) { + after_str = absl::StrCat( + std::string(max_filter_name_len + 1 - filter.name.name().length(), + ' '), + "after ", absl::StrJoin(after, ", ")); + } else { + after_str = + std::string(max_filter_name_len - filter.name.name().length(), ' '); + } + LOG(INFO) << " " << loc_strs[filter.name] << filter.name << after_str + << " [" << filter.ordering << "/" << filter.version << "]"; + } + // Finally list out the terminal filters and where they were registered + // from. + for (const auto& terminal : terminal_filters) { + const auto filter_str = absl::StrCat( + " ", loc_strs[terminal.name], terminal.name, + std::string(max_filter_name_len + 1 - terminal.name.name().length(), + ' '), + "[terminal]"); + LOG(INFO) << filter_str; + } +} + ChannelInit ChannelInit::Builder::Build() { ChannelInit result; for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) { diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h index c81b31a00c2..6bc9c318a08 100644 --- a/src/core/lib/surface/channel_init.h +++ b/src/core/lib/surface/channel_init.h @@ -348,6 +348,8 @@ class ChannelInit { using CreatedType = typename decltype(T::Create(ChannelArgs(), {}))::value_type; + class DependencyTracker; + struct Filter { Filter(UniqueTypeName name, const grpc_channel_filter* filter, FilterAdder filter_adder, std::vector predicates, @@ -380,6 +382,12 @@ class ChannelInit { static StackConfig BuildStackConfig( const std::vector>& registrations, PostProcessor* post_processors, grpc_channel_stack_type type); + static void PrintChannelStackTrace( + grpc_channel_stack_type type, + const std::vector>& + registrations, + const DependencyTracker& dependencies, const std::vector& filters, + const std::vector& terminal_filters); }; } // namespace grpc_core