Merge branch 'master' into excise-the-thing

pull/37003/head
Craig Tiller 5 months ago
commit 0e97262e21
  1. 136
      CMakeLists.txt
  2. 16
      build_autogenerated.yaml
  3. 1
      doc/grpc_xds_features.md
  4. 1
      grpc.def
  5. 3
      include/grpc/support/log.h
  6. 3
      include/grpcpp/security/server_credentials.h
  7. 42
      src/core/ext/transport/chaotic_good/client_transport.cc
  8. 13
      src/core/lib/gprpp/unique_type_name.h
  9. 1
      src/core/lib/iomgr/tcp_posix.cc
  10. 1
      src/core/lib/promise/party.cc
  11. 4
      src/core/lib/promise/party.h
  12. 438
      src/core/lib/surface/channel_init.cc
  13. 174
      src/core/lib/surface/channel_init.h
  14. 80
      src/core/lib/transport/call_filters.cc
  15. 381
      src/core/lib/transport/call_filters.h
  16. 12
      src/core/util/log.cc
  17. 2
      src/cpp/ext/filters/census/grpc_plugin.cc
  18. 1
      src/python/grpcio/grpc/aio/_base_server.py
  19. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  20. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  21. 12
      test/core/call/BUILD
  22. 12
      test/core/client_channel/BUILD
  23. 10
      test/core/event_engine/posix/BUILD
  24. 9
      test/core/experiments/BUILD
  25. 8
      test/core/filters/BUILD
  26. 1
      test/core/gprpp/BUILD
  27. 21
      test/core/gprpp/unique_type_name_test.cc
  28. 10
      test/core/load_balancing/BUILD
  29. 12
      test/core/promise/BUILD
  30. 65
      test/core/surface/channel_init_test.cc
  31. 12
      test/core/transport/BUILD
  32. 108
      test/core/transport/call_filters_test.cc
  33. 10
      test/core/transport/call_spine_benchmarks.h
  34. 142
      test/cpp/microbenchmarks/BUILD
  35. 37
      test/cpp/microbenchmarks/grpc_benchmark_config.bzl
  36. 4
      tools/bazelify_tests/dockerimage_current_versions.bzl
  37. 1
      tools/dockerfile/distribtest/python_dev_fedora38_x64.current_version
  38. 1
      tools/dockerfile/distribtest/python_dev_fedora39_x64.current_version
  39. 2
      tools/dockerfile/distribtest/python_dev_fedora39_x64/Dockerfile
  40. 1
      tools/dockerfile/distribtest/python_fedora38_x64.current_version
  41. 1
      tools/dockerfile/distribtest/python_fedora39_x64.current_version
  42. 2
      tools/dockerfile/distribtest/python_fedora39_x64/Dockerfile
  43. 4
      tools/run_tests/artifacts/distribtest_targets.py
  44. 30
      tools/run_tests/generated/tests.json
  45. 5
      tools/run_tests/sanity/banned_functions.py

136
CMakeLists.txt generated

@ -895,8 +895,12 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c bm_client_channel)
endif()
add_dependencies(buildtests_c bm_experiments)
add_dependencies(buildtests_c bm_http_client_filter)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c bm_experiments)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c bm_http_client_filter)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c bm_party)
endif()
@ -5946,81 +5950,82 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_executable(bm_experiments
test/core/experiments/bm_experiments.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(bm_experiments
PRIVATE
"GPR_DLL_IMPORTS"
"GRPC_DLL_IMPORTS"
"GRPCXX_DLL_IMPORTS"
)
add_executable(bm_experiments
test/core/experiments/bm_experiments.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(bm_experiments
PRIVATE
"GPR_DLL_IMPORTS"
"GRPC_DLL_IMPORTS"
)
endif()
endif()
endif()
target_compile_features(bm_experiments PUBLIC cxx_std_14)
target_include_directories(bm_experiments
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_compile_features(bm_experiments PUBLIC cxx_std_14)
target_include_directories(bm_experiments
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(bm_experiments
${_gRPC_ALLTARGETS_LIBRARIES}
absl::btree
${_gRPC_BENCHMARK_LIBRARIES}
grpc++
grpc_test_util
)
target_link_libraries(bm_experiments
${_gRPC_ALLTARGETS_LIBRARIES}
${_gRPC_BENCHMARK_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_executable(bm_http_client_filter
test/core/filters/bm_http_client_filter.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(bm_http_client_filter
PRIVATE
"GPR_DLL_IMPORTS"
"GRPC_DLL_IMPORTS"
)
add_executable(bm_http_client_filter
test/core/filters/bm_http_client_filter.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(bm_http_client_filter
PRIVATE
"GPR_DLL_IMPORTS"
"GRPC_DLL_IMPORTS"
)
endif()
endif()
endif()
target_compile_features(bm_http_client_filter PUBLIC cxx_std_14)
target_include_directories(bm_http_client_filter
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_compile_features(bm_http_client_filter PUBLIC cxx_std_14)
target_include_directories(bm_http_client_filter
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(bm_http_client_filter
${_gRPC_ALLTARGETS_LIBRARIES}
${_gRPC_BENCHMARK_LIBRARIES}
grpc
)
target_link_libraries(bm_http_client_filter
${_gRPC_ALLTARGETS_LIBRARIES}
${_gRPC_BENCHMARK_LIBRARIES}
grpc
)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
@ -33053,6 +33058,7 @@ target_include_directories(unique_type_name_test
target_link_libraries(unique_type_name_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::flat_hash_map
absl::str_format
)

@ -5199,12 +5199,16 @@ targets:
src:
- test/core/experiments/bm_experiments.cc
deps:
- absl/container:btree
- benchmark
- grpc++
- grpc_test_util
args:
- --benchmark_min_time=0.001s
benchmark: true
defaults: benchmark
platforms:
- linux
- posix
uses_polling: false
- name: bm_http_client_filter
build: test
language: c
@ -5219,6 +5223,9 @@ targets:
- --benchmark_min_time=0.001s
benchmark: true
defaults: benchmark
platforms:
- linux
- posix
uses_polling: false
- name: bm_party
build: test
@ -5301,6 +5308,8 @@ targets:
- absl/types:span
- benchmark
- gpr
args:
- --benchmark_min_time=0.001s
benchmark: true
defaults: benchmark
platforms:
@ -13815,6 +13824,8 @@ targets:
- gtest
- benchmark
- grpc_test_util
args:
- --benchmark_min_time=0.001s
benchmark: true
defaults: benchmark
platforms:
@ -21444,6 +21455,7 @@ targets:
- test/core/gprpp/unique_type_name_test.cc
deps:
- gtest
- absl/container:flat_hash_map
- absl/strings:str_format
uses_polling: false
- name: unknown_frame_bad_client_test

@ -80,3 +80,4 @@ Pick First | [A62](https://github.com/grpc/proposal/blob/master/A62-pick-first.m
LRS Custom Metrics Support | [A64](https://github.com/grpc/proposal/blob/master/A64-lrs-custom-metrics.md) | v1.54.0 | | | |
mTLS Credentials in xDS Bootstrap File | [A65](https://github.com/grpc/proposal/blob/master/A65-xds-mtls-creds-in-bootstrap.md) | v1.65.0 | | v1.61.0 | |
Stateful Session Affinity | [A55](https://github.com/grpc/proposal/blob/master/A55-xds-stateful-session-affinity.md), [A60](https://github.com/grpc/proposal/blob/master/A60-xds-stateful-session-affinity-weighted-clusters.md), [A75](https://github.com/grpc/proposal/blob/master/A75-xds-aggregate-cluster-behavior-fixes.md) | v1.61.0 | | | |
xDS Locality label for OpenTelemetry metrics | [A78](https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md) | v1.63.0 (C++) | v1.64.0 | | |

1
grpc.def generated

@ -230,7 +230,6 @@ EXPORTS
gpr_free_aligned
gpr_cpu_num_cores
gpr_cpu_current_cpu
gpr_log_severity_string
gpr_log
gpr_should_log
gpr_log_message

@ -41,9 +41,6 @@ typedef enum gpr_log_severity {
GPR_LOG_SEVERITY_ERROR
} gpr_log_severity;
/** Returns a string representation of the log severity */
GPRAPI const char* gpr_log_severity_string(gpr_log_severity severity);
/** Macros to build log contexts at various severity levels */
#define GPR_DEBUG __FILE__, __LINE__, GPR_LOG_SEVERITY_DEBUG
#define GPR_INFO __FILE__, __LINE__, GPR_LOG_SEVERITY_INFO

@ -116,9 +116,6 @@ std::shared_ptr<ServerCredentials> AltsServerCredentials(
const AltsServerCredentialsOptions& options);
/// Builds Local ServerCredentials.
std::shared_ptr<ServerCredentials> AltsServerCredentials(
const AltsServerCredentialsOptions& options);
std::shared_ptr<ServerCredentials> LocalServerCredentials(
grpc_local_connect_type type);

@ -316,27 +316,29 @@ auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id,
void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
// At this point, the connection is set up.
// Start sending data frames.
call_handler.SpawnGuarded("outbound_loop", [this, call_handler]() mutable {
const uint32_t stream_id = MakeStream(call_handler);
return Map(CallOutboundLoop(stream_id, call_handler),
[stream_id, sender = outgoing_frames_.MakeSender()](
absl::Status result) mutable {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Call " << stream_id << " finished with "
<< result.ToString();
if (!result.ok()) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel";
CancelFrame frame;
frame.stream_id = stream_id;
if (!sender.UnbufferedImmediateSend(std::move(frame))) {
call_handler.SpawnGuarded(
"outbound_loop", [self = RefAsSubclass<ChaoticGoodClientTransport>(),
call_handler]() mutable {
const uint32_t stream_id = self->MakeStream(call_handler);
return Map(self->CallOutboundLoop(stream_id, call_handler),
[stream_id, sender = self->outgoing_frames_.MakeSender()](
absl::Status result) mutable {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel failed";
}
}
return result;
});
});
<< "CHAOTIC_GOOD: Call " << stream_id
<< " finished with " << result.ToString();
if (!result.ok()) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel";
CancelFrame frame;
frame.stream_id = stream_id;
if (!sender.UnbufferedImmediateSend(std::move(frame))) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel failed";
}
}
return result;
});
});
}
void ChaoticGoodClientTransport::PerformOp(grpc_transport_op* op) {

@ -70,13 +70,6 @@ class UniqueTypeName {
std::string* name_;
};
// Copyable.
UniqueTypeName(const UniqueTypeName& other) : name_(other.name_) {}
UniqueTypeName& operator=(const UniqueTypeName& other) {
name_ = other.name_;
return *this;
}
bool operator==(const UniqueTypeName& other) const {
return name_.data() == other.name_.data();
}
@ -87,6 +80,12 @@ class UniqueTypeName {
return name_.data() < other.name_.data();
}
template <typename H>
friend H AbslHashValue(H h, const UniqueTypeName& name) {
return H::combine(std::move(h),
static_cast<const void*>(name.name_.data()));
}
int Compare(const UniqueTypeName& other) const {
return QsortCompare(name_.data(), other.name_.data());
}

@ -806,7 +806,6 @@ static void tcp_ref(grpc_tcp* tcp) { tcp->refcount.Ref(); }
#endif
static void tcp_destroy(grpc_endpoint* ep) {
gpr_log(GPR_INFO, "IOMGR endpoint shutdown");
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
ZerocopyDisableAndWaitForRemaining(tcp);
grpc_fd_shutdown(tcp->em_fd, absl::UnavailableError("endpoint shutdown"));

@ -178,6 +178,7 @@ Party::Participant::~Participant() {
Party::~Party() {}
void Party::CancelRemainingParticipants() {
if (!sync_.has_participants()) return;
ScopedActivity activity(this);
for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
if (auto* p =

@ -224,6 +224,10 @@ class PartySyncUsingAtomics {
return iteration_.load(std::memory_order_relaxed);
}
bool has_participants() const {
return (state_.load(std::memory_order_relaxed) & kAllocatedMask) != 0;
}
private:
bool UnreffedLast();

@ -22,6 +22,7 @@
#include <algorithm>
#include <map>
#include <queue>
#include <set>
#include <string>
#include <type_traits>
@ -39,6 +40,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/surface/channel_stack_type.h"
namespace grpc_core {
@ -47,17 +49,16 @@ UniqueTypeName (*NameFromChannelFilter)(const grpc_channel_filter*);
namespace {
struct CompareChannelFiltersByName {
bool operator()(const grpc_channel_filter* a,
const grpc_channel_filter* b) const {
bool operator()(UniqueTypeName a, UniqueTypeName b) const {
// Compare lexicographically instead of by pointer value so that different
// builds make the same choices.
return NameFromChannelFilter(a).name() < NameFromChannelFilter(b).name();
return a.name() < b.name();
}
};
} // namespace
ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::After(
std::initializer_list<const grpc_channel_filter*> filters) {
std::initializer_list<UniqueTypeName> filters) {
for (auto filter : filters) {
after_.push_back(filter);
}
@ -65,7 +66,7 @@ ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::After(
}
ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::Before(
std::initializer_list<const grpc_channel_filter*> filters) {
std::initializer_list<UniqueTypeName> filters) {
for (auto filter : filters) {
before_.push_back(filter);
}
@ -105,13 +106,133 @@ ChannelInit::FilterRegistration::ExcludeFromMinimalStack() {
}
ChannelInit::FilterRegistration& ChannelInit::Builder::RegisterFilter(
grpc_channel_stack_type type, const grpc_channel_filter* filter,
FilterAdder filter_adder, SourceLocation registration_source) {
grpc_channel_stack_type type, UniqueTypeName name,
const grpc_channel_filter* filter, FilterAdder filter_adder,
SourceLocation registration_source) {
filters_[type].emplace_back(std::make_unique<FilterRegistration>(
filter, filter_adder, registration_source));
name, filter, filter_adder, registration_source));
return *filters_[type].back();
}
class ChannelInit::DependencyTracker {
public:
// Declare that a filter exists.
void Declare(FilterRegistration* registration) {
nodes_.emplace(registration->name_, registration);
}
// Insert an edge from a to b
// Both nodes must be declared.
void InsertEdge(UniqueTypeName a, UniqueTypeName b) {
auto it_a = nodes_.find(a);
auto it_b = nodes_.find(b);
if (it_a == nodes_.end()) {
LOG(ERROR) << "gRPC Filter " << a.name()
<< " was not declared before adding an edge to " << b.name();
return;
}
if (it_b == nodes_.end()) {
LOG(ERROR) << "gRPC Filter " << b.name()
<< " was not declared before adding an edge from " << a.name();
return;
}
auto& node_a = it_a->second;
auto& node_b = it_b->second;
node_a.dependents.push_back(&node_b);
node_b.all_dependencies.push_back(a);
++node_b.waiting_dependencies;
}
// Finish the dependency graph and begin iteration.
void FinishDependencyMap() {
for (auto& p : nodes_) {
if (p.second.waiting_dependencies == 0) {
ready_dependencies_.emplace(&p.second);
}
}
}
FilterRegistration* Next() {
if (ready_dependencies_.empty()) {
CHECK_EQ(nodes_taken_, nodes_.size()) << "Unresolvable graph of channel "
"filters:\n"
<< GraphString();
return nullptr;
}
auto next = ready_dependencies_.top();
ready_dependencies_.pop();
if (!ready_dependencies_.empty() &&
next.node->ordering() != Ordering::kDefault) {
// Constraint: if we use ordering other than default, then we must have an
// unambiguous pick. If there is ambiguity, we must fix it by adding
// explicit ordering constraints.
CHECK_NE(next.node->ordering(),
ready_dependencies_.top().node->ordering())
<< "Ambiguous ordering between " << next.node->name() << " and "
<< ready_dependencies_.top().node->name();
}
for (Node* dependent : next.node->dependents) {
CHECK_GT(dependent->waiting_dependencies, 0u);
--dependent->waiting_dependencies;
if (dependent->waiting_dependencies == 0) {
ready_dependencies_.emplace(dependent);
}
}
++nodes_taken_;
return next.node->registration;
}
// Debug helper to dump the graph
std::string GraphString() const {
std::string result;
for (const auto& p : nodes_) {
absl::StrAppend(&result, p.first, " ->");
for (const auto& d : p.second.all_dependencies) {
absl::StrAppend(&result, " ", d);
}
absl::StrAppend(&result, "\n");
}
return result;
}
absl::Span<const UniqueTypeName> DependenciesFor(UniqueTypeName name) const {
auto it = nodes_.find(name);
CHECK(it != nodes_.end()) << "Filter " << name.name() << " not found";
return it->second.all_dependencies;
}
private:
struct Node {
explicit Node(FilterRegistration* registration)
: registration(registration) {}
// Nodes that depend on this node
std::vector<Node*> dependents;
// Nodes that this node depends on - for debugging purposes only
std::vector<UniqueTypeName> all_dependencies;
// The registration for this node
FilterRegistration* registration;
// Number of nodes this node is waiting on
size_t waiting_dependencies = 0;
Ordering ordering() const { return registration->ordering_; }
absl::string_view name() const { return registration->name_.name(); }
};
struct ReadyDependency {
explicit ReadyDependency(Node* node) : node(node) {}
Node* node;
bool operator<(const ReadyDependency& other) const {
// Sort first on ordering, and then lexically on name.
// The lexical sort means that the ordering is stable between builds
// (UniqueTypeName ordering is not stable between builds).
return node->ordering() > other.node->ordering() ||
(node->ordering() == other.node->ordering() &&
node->name() > other.node->name());
}
};
absl::flat_hash_map<UniqueTypeName, Node> nodes_;
std::priority_queue<ReadyDependency> ready_dependencies_;
size_t nodes_taken_ = 0;
};
ChannelInit::StackConfig ChannelInit::BuildStackConfig(
const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
registrations,
@ -122,72 +243,35 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
// ensure algorithm ordering stability is deterministic for a given build.
// We should not require this, but at the time of writing it's expected that
// this will help overall stability.
using F = const grpc_channel_filter*;
std::map<F, FilterRegistration*> filter_to_registration;
using DependencyMap = std::map<F, std::set<F, CompareChannelFiltersByName>,
CompareChannelFiltersByName>;
DependencyMap dependencies;
DependencyTracker dependencies;
std::vector<Filter> terminal_filters;
for (const auto& registration : registrations) {
if (filter_to_registration.count(registration->filter_) > 0) {
const auto first =
filter_to_registration[registration->filter_]->registration_source_;
const auto second = registration->registration_source_;
Crash(absl::StrCat("Duplicate registration of channel filter ",
NameFromChannelFilter(registration->filter_),
"\nfirst: ", first.file(), ":", first.line(),
"\nsecond: ", second.file(), ":", second.line()));
}
filter_to_registration[registration->filter_] = registration.get();
if (registration->terminal_) {
CHECK(registration->after_.empty());
CHECK(registration->before_.empty());
CHECK(!registration->before_all_);
CHECK_EQ(registration->ordering_, Ordering::kDefault);
terminal_filters.emplace_back(
registration->filter_, nullptr, std::move(registration->predicates_),
registration->skip_v3_, registration->registration_source_);
registration->name_, registration->filter_, nullptr,
std::move(registration->predicates_), registration->version_,
registration->ordering_, registration->registration_source_);
} else {
dependencies[registration->filter_]; // Ensure it's in the map.
dependencies.Declare(registration.get());
}
}
for (const auto& registration : registrations) {
if (registration->terminal_) continue;
CHECK_GT(filter_to_registration.count(registration->filter_), 0u);
for (F after : registration->after_) {
if (filter_to_registration.count(after) == 0) {
gpr_log(
GPR_DEBUG, "%s",
absl::StrCat(
"Filter ", NameFromChannelFilter(after),
" not registered, but is referenced in the after clause of ",
NameFromChannelFilter(registration->filter_),
" when building channel stack ",
grpc_channel_stack_type_string(type))
.c_str());
continue;
}
dependencies[registration->filter_].insert(after);
for (UniqueTypeName after : registration->after_) {
dependencies.InsertEdge(after, registration->name_);
}
for (F before : registration->before_) {
if (filter_to_registration.count(before) == 0) {
gpr_log(
GPR_DEBUG, "%s",
absl::StrCat(
"Filter ", NameFromChannelFilter(before),
" not registered, but is referenced in the before clause of ",
NameFromChannelFilter(registration->filter_),
" when building channel stack ",
grpc_channel_stack_type_string(type))
.c_str());
continue;
}
dependencies[before].insert(registration->filter_);
for (UniqueTypeName before : registration->before_) {
dependencies.InsertEdge(registration->name_, before);
}
if (registration->before_all_) {
for (const auto& other : registrations) {
if (other.get() == registration.get()) continue;
if (other->terminal_) continue;
dependencies[other->filter_].insert(registration->filter_);
dependencies.InsertEdge(registration->name_, other->name_);
}
}
}
@ -195,43 +279,13 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
// We can simply iterate through and add anything with no dependency.
// We then remove that filter from the dependency list of all other filters.
// We repeat until we have no more filters to add.
auto build_remaining_dependency_graph =
[](const DependencyMap& dependencies) {
std::string result;
for (const auto& p : dependencies) {
absl::StrAppend(&result, NameFromChannelFilter(p.first), " ->");
for (const auto& d : p.second) {
absl::StrAppend(&result, " ", NameFromChannelFilter(d));
}
absl::StrAppend(&result, "\n");
}
return result;
};
const DependencyMap original = dependencies;
auto take_ready_dependency = [&]() {
for (auto it = dependencies.begin(); it != dependencies.end(); ++it) {
if (it->second.empty()) {
auto r = it->first;
dependencies.erase(it);
return r;
}
}
Crash(absl::StrCat(
"Unresolvable graph of channel filters - remaining graph:\n",
build_remaining_dependency_graph(dependencies), "original:\n",
build_remaining_dependency_graph(original)));
};
dependencies.FinishDependencyMap();
std::vector<Filter> filters;
while (!dependencies.empty()) {
auto filter = take_ready_dependency();
auto* registration = filter_to_registration[filter];
filters.emplace_back(filter, registration->filter_adder_,
std::move(registration->predicates_),
registration->skip_v3_,
registration->registration_source_);
for (auto& p : dependencies) {
p.second.erase(filter);
}
while (auto registration = dependencies.Next()) {
filters.emplace_back(
registration->name_, registration->filter_, registration->filter_adder_,
std::move(registration->predicates_), registration->version_,
registration->ordering_, registration->registration_source_);
}
// Collect post processors that need to be applied.
// We've already ensured the one-per-slot constraint, so now we can just
@ -243,95 +297,8 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
}
// Log out the graph we built if that's been requested.
if (GRPC_TRACE_FLAG_ENABLED(channel_stack)) {
// It can happen that multiple threads attempt to construct a core config at
// once.
// This is benign - the first one wins and others are discarded.
// However, it messes up our logging and makes it harder to reason about the
// graph, so we add some protection here.
static Mutex* const m = new Mutex();
MutexLock lock(m);
// List the channel stack type (since we'll be repeatedly printing graphs in
// this loop).
LOG(INFO) << "ORDERED CHANNEL STACK "
<< grpc_channel_stack_type_string(type) << ":";
// First build up a map of filter -> file:line: strings, because it helps
// the readability of this log to get later fields aligned vertically.
std::map<const grpc_channel_filter*, std::string> loc_strs;
size_t max_loc_str_len = 0;
size_t max_filter_name_len = 0;
auto add_loc_str = [&max_loc_str_len, &loc_strs, &filter_to_registration,
&max_filter_name_len](
const grpc_channel_filter* filter) {
max_filter_name_len = std::max(
NameFromChannelFilter(filter).name().length(), max_filter_name_len);
const auto registration =
filter_to_registration[filter]->registration_source_;
absl::string_view file = registration.file();
auto slash_pos = file.rfind('/');
if (slash_pos != file.npos) {
file = file.substr(slash_pos + 1);
}
auto loc_str = absl::StrCat(file, ":", registration.line(), ":");
max_loc_str_len = std::max(max_loc_str_len, loc_str.length());
loc_strs.emplace(filter, std::move(loc_str));
};
for (const auto& filter : filters) {
add_loc_str(filter.filter);
}
for (const auto& terminal : terminal_filters) {
add_loc_str(terminal.filter);
}
for (auto& loc_str : loc_strs) {
loc_str.second = absl::StrCat(
loc_str.second,
std::string(max_loc_str_len + 2 - loc_str.second.length(), ' '));
}
// For each regular filter, print the location registered, the name of the
// filter, and if it needed to occur after some other filters list those
// filters too.
// Note that we use the processed after list here - earlier we turned Before
// registrations into After registrations and we used those converted
// registrations to build the final ordering.
// If you're trying to track down why 'A' is listed as after 'B', look at
// the following:
// - If A is registered with .After({B}), then A will be 'after' B here.
// - If B is registered with .Before({A}), then A will be 'after' B here.
// - If B is registered as BeforeAll, then A will be 'after' B here.
for (const auto& filter : filters) {
auto dep_it = original.find(filter.filter);
std::string after_str;
if (dep_it != original.end() && !dep_it->second.empty()) {
after_str = absl::StrCat(
std::string(
max_filter_name_len + 1 -
NameFromChannelFilter(filter.filter).name().length(),
' '),
"after ",
absl::StrJoin(
dep_it->second, ", ",
[](std::string* out, const grpc_channel_filter* filter) {
out->append(
std::string(NameFromChannelFilter(filter).name()));
}));
}
const auto filter_str =
absl::StrCat(" ", loc_strs[filter.filter],
NameFromChannelFilter(filter.filter), after_str);
LOG(INFO) << filter_str;
}
// Finally list out the terminal filters and where they were registered
// from.
for (const auto& terminal : terminal_filters) {
const auto filter_str = absl::StrCat(
" ", loc_strs[terminal.filter],
NameFromChannelFilter(terminal.filter),
std::string(
max_filter_name_len + 1 -
NameFromChannelFilter(terminal.filter).name().length(),
' '),
"[terminal]");
LOG(INFO) << filter_str;
}
PrintChannelStackTrace(type, registrations, dependencies, filters,
terminal_filters);
}
// Check if there are no terminal filters: this would be an error.
// GRPC_CLIENT_DYNAMIC stacks don't use this mechanism, so we don't check that
@ -353,6 +320,95 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
std::move(post_processor_functions)};
};
void ChannelInit::PrintChannelStackTrace(
grpc_channel_stack_type type,
const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
registrations,
const DependencyTracker& dependencies, const std::vector<Filter>& filters,
const std::vector<Filter>& terminal_filters) {
// It can happen that multiple threads attempt to construct a core config at
// once.
// This is benign - the first one wins and others are discarded.
// However, it messes up our logging and makes it harder to reason about the
// graph, so we add some protection here.
static Mutex* const m = new Mutex();
MutexLock lock(m);
// List the channel stack type (since we'll be repeatedly printing graphs in
// this loop).
LOG(INFO) << "ORDERED CHANNEL STACK " << grpc_channel_stack_type_string(type)
<< ":";
// First build up a map of filter -> file:line: strings, because it helps
// the readability of this log to get later fields aligned vertically.
absl::flat_hash_map<UniqueTypeName, std::string> loc_strs;
size_t max_loc_str_len = 0;
size_t max_filter_name_len = 0;
auto add_loc_str = [&max_loc_str_len, &loc_strs, &registrations,
&max_filter_name_len](UniqueTypeName name) {
max_filter_name_len = std::max(name.name().length(), max_filter_name_len);
for (const auto& registration : registrations) {
if (registration->name_ == name) {
auto source = registration->registration_source_;
absl::string_view file = source.file();
auto slash_pos = file.rfind('/');
if (slash_pos != file.npos) {
file = file.substr(slash_pos + 1);
}
auto loc_str = absl::StrCat(file, ":", source.line(), ":");
max_loc_str_len = std::max(max_loc_str_len, loc_str.length());
loc_strs.emplace(name, std::move(loc_str));
break;
}
}
};
for (const auto& filter : filters) {
add_loc_str(filter.name);
}
for (const auto& terminal : terminal_filters) {
add_loc_str(terminal.name);
}
for (auto& loc_str : loc_strs) {
loc_str.second = absl::StrCat(
loc_str.second,
std::string(max_loc_str_len + 2 - loc_str.second.length(), ' '));
}
// For each regular filter, print the location registered, the name of the
// filter, and if it needed to occur after some other filters list those
// filters too.
// Note that we use the processed after list here - earlier we turned Before
// registrations into After registrations and we used those converted
// registrations to build the final ordering.
// If you're trying to track down why 'A' is listed as after 'B', look at
// the following:
// - If A is registered with .After({B}), then A will be 'after' B here.
// - If B is registered with .Before({A}), then A will be 'after' B here.
// - If B is registered as BeforeAll, then A will be 'after' B here.
for (const auto& filter : filters) {
auto after = dependencies.DependenciesFor(filter.name);
std::string after_str;
if (!after.empty()) {
after_str = absl::StrCat(
std::string(max_filter_name_len + 1 - filter.name.name().length(),
' '),
"after ", absl::StrJoin(after, ", "));
} else {
after_str =
std::string(max_filter_name_len - filter.name.name().length(), ' ');
}
LOG(INFO) << " " << loc_strs[filter.name] << filter.name << after_str
<< " [" << filter.ordering << "/" << filter.version << "]";
}
// Finally list out the terminal filters and where they were registered
// from.
for (const auto& terminal : terminal_filters) {
const auto filter_str = absl::StrCat(
" ", loc_strs[terminal.name], terminal.name,
std::string(max_filter_name_len + 1 - terminal.name.name().length(),
' '),
"[terminal]");
LOG(INFO) << filter_str;
}
}
ChannelInit ChannelInit::Builder::Build() {
ChannelInit result;
for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) {
@ -373,6 +429,7 @@ bool ChannelInit::Filter::CheckPredicates(const ChannelArgs& args) const {
bool ChannelInit::CreateStack(ChannelStackBuilder* builder) const {
const auto& stack_config = stack_configs_[builder->channel_stack_type()];
for (const auto& filter : stack_config.filters) {
if (SkipV2(filter.version)) continue;
if (!filter.CheckPredicates(builder->channel_args())) continue;
builder->AppendFilter(filter.filter);
}
@ -394,13 +451,13 @@ bool ChannelInit::CreateStack(ChannelStackBuilder* builder) const {
absl::StrAppend(&error, " No terminal filters were registered");
} else {
for (const auto& terminator : stack_config.terminators) {
absl::StrAppend(
&error, " ", NameFromChannelFilter(terminator.filter),
" registered @ ", terminator.registration_source.file(), ":",
terminator.registration_source.line(), ": enabled = ",
terminator.CheckPredicates(builder->channel_args()) ? "true"
: "false",
"\n");
absl::StrAppend(&error, " ", terminator.name, " registered @ ",
terminator.registration_source.file(), ":",
terminator.registration_source.line(), ": enabled = ",
terminator.CheckPredicates(builder->channel_args())
? "true"
: "false",
"\n");
}
}
LOG(ERROR) << error;
@ -417,12 +474,11 @@ void ChannelInit::AddToInterceptionChainBuilder(
const auto& stack_config = stack_configs_[type];
// Based on predicates build a list of filters to include in this segment.
for (const auto& filter : stack_config.filters) {
if (filter.skip_v3) continue;
if (SkipV3(filter.version)) continue;
if (!filter.CheckPredicates(builder.channel_args())) continue;
if (filter.filter_adder == nullptr) {
builder.Fail(absl::InvalidArgumentError(
absl::StrCat("Filter ", NameFromChannelFilter(filter.filter),
" has no v3-callstack vtable")));
absl::StrCat("Filter ", filter.name, " has no v3-callstack vtable")));
return;
}
filter.filter_adder(builder);

@ -23,6 +23,7 @@
#include <initializer_list>
#include <memory>
#include <ostream>
#include <utility>
#include <vector>
@ -65,6 +66,53 @@ namespace grpc_core {
extern UniqueTypeName (*NameFromChannelFilter)(const grpc_channel_filter*);
class ChannelInit {
private:
// Version constraints: filters can be registered against a specific version
// of the stack (V2 || V3), or registered for any stack.
enum class Version : uint8_t {
kAny,
kV2,
kV3,
};
static const char* VersionToString(Version version) {
switch (version) {
case Version::kAny:
return "Any";
case Version::kV2:
return "V2";
case Version::kV3:
return "V3";
}
return "Unknown";
}
template <typename Sink>
friend void AbslStringify(Sink& sink, Version version) {
sink.Append(VersionToString(version));
}
friend std::ostream& operator<<(std::ostream& out, Version version) {
return out << VersionToString(version);
}
static bool SkipV3(Version version) {
switch (version) {
case Version::kAny:
case Version::kV3:
return false;
case Version::kV2:
return true;
}
GPR_UNREACHABLE_CODE(return false);
}
static bool SkipV2(Version version) {
switch (version) {
case Version::kAny:
case Version::kV2:
return false;
case Version::kV3:
return true;
}
GPR_UNREACHABLE_CODE(return false);
}
public:
// Predicate for if a filter registration applies
using InclusionPredicate = absl::AnyInvocable<bool(const ChannelArgs&) const>;
@ -80,15 +128,61 @@ class ChannelInit {
kXdsChannelStackModifier,
kCount
};
static const char* PostProcessorSlotName(PostProcessorSlot slot) {
switch (slot) {
case PostProcessorSlot::kAuthSubstitution:
return "AuthSubstitution";
case PostProcessorSlot::kXdsChannelStackModifier:
return "XdsChannelStackModifier";
case PostProcessorSlot::kCount:
return "---count---";
}
return "Unknown";
}
template <typename Sink>
friend void AbslStringify(Sink& sink, PostProcessorSlot slot) {
sink.Append(PostProcessorSlotName(slot));
}
// Ordering priorities.
// Most filters should use the kDefault priority.
// Filters that need to appear before the default priority should use kTop,
// filters that need to appear later should use the kBottom priority.
// Explicit before/after ordering between filters dominates: eg, if a filter
// with kBottom priority is marked as *BEFORE* a kTop filter, then the first
// filter will appear before the second.
// It is an error to have two filters with kTop (or two with kBottom)
// available at the same time. If this occurs, the filters should be
// explicitly marked with a before/after relationship.
enum class Ordering : uint8_t { kTop, kDefault, kBottom };
static const char* OrderingToString(Ordering ordering) {
switch (ordering) {
case Ordering::kTop:
return "Top";
case Ordering::kDefault:
return "Default";
case Ordering::kBottom:
return "Bottom";
}
return "Unknown";
}
template <typename Sink>
friend void AbslStringify(Sink& sink, Ordering ordering) {
sink.Append(OrderingToString(ordering));
};
friend std::ostream& operator<<(std::ostream& out, Ordering ordering) {
return out << OrderingToString(ordering);
}
class FilterRegistration {
public:
// TODO(ctiller): Remove grpc_channel_filter* arg when that can be
// deprecated (once filter stack is removed).
explicit FilterRegistration(const grpc_channel_filter* filter,
explicit FilterRegistration(UniqueTypeName name,
const grpc_channel_filter* filter,
FilterAdder filter_adder,
SourceLocation registration_source)
: filter_(filter),
: name_(name),
filter_(filter),
filter_adder_(filter_adder),
registration_source_(registration_source) {}
FilterRegistration(const FilterRegistration&) = delete;
@ -99,14 +193,14 @@ class ChannelInit {
// the same channel stack type as this registration.
template <typename Filter>
FilterRegistration& After() {
return After({&Filter::kFilter});
return After({UniqueTypeNameFor<Filter>()});
}
// Ensure that this filter is placed *before* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
template <typename Filter>
FilterRegistration& Before() {
return Before({&Filter::kFilter});
return Before({UniqueTypeNameFor<Filter>()});
}
// Ensure that this filter is placed *after* the filters listed here.
@ -114,15 +208,13 @@ class ChannelInit {
// the same channel stack type as this registration.
// TODO(ctiller): remove in favor of the version that does not mention
// grpc_channel_filter
FilterRegistration& After(
std::initializer_list<const grpc_channel_filter*> filters);
FilterRegistration& After(std::initializer_list<UniqueTypeName> filters);
// Ensure that this filter is placed *before* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
// TODO(ctiller): remove in favor of the version that does not mention
// grpc_channel_filter
FilterRegistration& Before(
std::initializer_list<const grpc_channel_filter*> filters);
FilterRegistration& Before(std::initializer_list<UniqueTypeName> filters);
// Add a predicate for this filters inclusion.
// If the predicate returns true the filter will be included in the stack.
// Predicates do not affect the ordering of the filter stack: we first
@ -157,20 +249,42 @@ class ChannelInit {
// stack.
FilterRegistration& ExcludeFromMinimalStack();
FilterRegistration& SkipV3() {
skip_v3_ = true;
CHECK_EQ(version_, Version::kAny);
version_ = Version::kV2;
return *this;
}
FilterRegistration& SkipV2() {
CHECK_EQ(version_, Version::kAny);
version_ = Version::kV3;
return *this;
}
// Request this filter be placed as high as possible in the stack (given
// before/after constraints).
FilterRegistration& FloatToTop() {
CHECK_EQ(ordering_, Ordering::kDefault);
ordering_ = Ordering::kTop;
return *this;
}
// Request this filter be placed as low as possible in the stack (given
// before/after constraints).
FilterRegistration& SinkToBottom() {
CHECK_EQ(ordering_, Ordering::kDefault);
ordering_ = Ordering::kBottom;
return *this;
}
private:
friend class ChannelInit;
const UniqueTypeName name_;
const grpc_channel_filter* const filter_;
const FilterAdder filter_adder_;
std::vector<const grpc_channel_filter*> after_;
std::vector<const grpc_channel_filter*> before_;
std::vector<UniqueTypeName> after_;
std::vector<UniqueTypeName> before_;
std::vector<InclusionPredicate> predicates_;
bool terminal_ = false;
bool before_all_ = false;
bool skip_v3_ = false;
Version version_ = Version::kAny;
Ordering ordering_ = Ordering::kDefault;
SourceLocation registration_source_;
};
@ -183,14 +297,22 @@ class ChannelInit {
// TODO(ctiller): remove in favor of the version that does not mention
// grpc_channel_filter
FilterRegistration& RegisterFilter(grpc_channel_stack_type type,
UniqueTypeName name,
const grpc_channel_filter* filter,
FilterAdder filter_adder = nullptr,
SourceLocation registration_source = {});
FilterRegistration& RegisterFilter(
grpc_channel_stack_type type, const grpc_channel_filter* filter,
SourceLocation registration_source = {}) {
CHECK(filter != nullptr);
return RegisterFilter(type, NameFromChannelFilter(filter), filter,
nullptr, registration_source);
}
template <typename Filter>
FilterRegistration& RegisterFilter(
grpc_channel_stack_type type, SourceLocation registration_source = {}) {
return RegisterFilter(
type, &Filter::kFilter,
type, UniqueTypeNameFor<Filter>(), &Filter::kFilter,
[](InterceptionChainBuilder& builder) { builder.Add<Filter>(); },
registration_source);
}
@ -199,8 +321,7 @@ class ChannelInit {
template <typename Filter>
FilterRegistration& RegisterV2Filter(
grpc_channel_stack_type type, SourceLocation registration_source = {}) {
return RegisterFilter(type, &Filter::kFilter, nullptr,
registration_source)
return RegisterFilter(type, &Filter::kFilter, registration_source)
.SkipV3();
}
@ -241,20 +362,27 @@ class ChannelInit {
using CreatedType =
typename decltype(T::Create(ChannelArgs(), {}))::value_type;
class DependencyTracker;
struct Filter {
Filter(const grpc_channel_filter* filter, FilterAdder filter_adder,
std::vector<InclusionPredicate> predicates, bool skip_v3,
Filter(UniqueTypeName name, const grpc_channel_filter* filter,
FilterAdder filter_adder, std::vector<InclusionPredicate> predicates,
Version version, Ordering ordering,
SourceLocation registration_source)
: filter(filter),
: name(name),
filter(filter),
filter_adder(filter_adder),
predicates(std::move(predicates)),
registration_source(registration_source),
skip_v3(skip_v3) {}
version(version),
ordering(ordering) {}
UniqueTypeName name;
const grpc_channel_filter* filter;
const FilterAdder filter_adder;
std::vector<InclusionPredicate> predicates;
SourceLocation registration_source;
bool skip_v3 = false;
Version version;
Ordering ordering;
bool CheckPredicates(const ChannelArgs& args) const;
};
struct StackConfig {
@ -268,6 +396,12 @@ class ChannelInit {
static StackConfig BuildStackConfig(
const std::vector<std::unique_ptr<FilterRegistration>>& registrations,
PostProcessor* post_processors, grpc_channel_stack_type type);
static void PrintChannelStackTrace(
grpc_channel_stack_type type,
const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
registrations,
const DependencyTracker& dependencies, const std::vector<Filter>& filters,
const std::vector<Filter>& terminal_filters);
};
} // namespace grpc_core

@ -35,6 +35,16 @@ void RunHalfClose(absl::Span<const HalfCloseOperator> ops, void* call_data) {
}
}
ServerMetadataHandle RunServerTrailingMetadata(
absl::Span<const ServerTrailingMetadataOperator> ops, void* call_data,
ServerMetadataHandle md) {
for (auto& op : ops) {
md = op.server_trailing_metadata(Offset(call_data, op.call_offset),
op.channel_data, std::move(md));
}
return md;
}
template <typename T>
OperationExecutor<T>::~OperationExecutor() {
if (promise_data_ != nullptr) {
@ -44,8 +54,8 @@ OperationExecutor<T>::~OperationExecutor() {
}
template <typename T>
Poll<ResultOr<T>> OperationExecutor<T>::Start(
const Layout<FallibleOperator<T>>* layout, T input, void* call_data) {
Poll<ResultOr<T>> OperationExecutor<T>::Start(const Layout<T>* layout, T input,
void* call_data) {
ops_ = layout->ops.data();
end_ops_ = ops_ + layout->ops.size();
if (layout->promise_size == 0) {
@ -101,75 +111,11 @@ Poll<ResultOr<T>> OperationExecutor<T>::ContinueStep(void* call_data) {
return Pending{};
}
template <typename T>
InfallibleOperationExecutor<T>::~InfallibleOperationExecutor() {
if (promise_data_ != nullptr) {
ops_->early_destroy(promise_data_);
gpr_free_aligned(promise_data_);
}
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::Start(
const Layout<InfallibleOperator<T>>* layout, T input, void* call_data) {
ops_ = layout->ops.data();
end_ops_ = ops_ + layout->ops.size();
if (layout->promise_size == 0) {
// No call state ==> instantaneously ready
auto r = InitStep(std::move(input), call_data);
CHECK(r.ready());
return r;
}
promise_data_ =
gpr_malloc_aligned(layout->promise_size, layout->promise_alignment);
return InitStep(std::move(input), call_data);
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::InitStep(T input, void* call_data) {
while (true) {
if (ops_ == end_ops_) {
return input;
}
auto p =
ops_->promise_init(promise_data_, Offset(call_data, ops_->call_offset),
ops_->channel_data, std::move(input));
if (auto* r = p.value_if_ready()) {
input = std::move(*r);
++ops_;
continue;
}
return Pending{};
}
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::Step(void* call_data) {
DCHECK_NE(promise_data_, nullptr);
auto p = ContinueStep(call_data);
if (p.ready()) {
gpr_free_aligned(promise_data_);
promise_data_ = nullptr;
}
return p;
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::ContinueStep(void* call_data) {
auto p = ops_->poll(promise_data_);
if (auto* r = p.value_if_ready()) {
++ops_;
return InitStep(std::move(*r), call_data);
}
return Pending{};
}
// Explicit instantiations of some types used in filters.h
// We'll need to add ServerMetadataHandle to this when it becomes different
// to ClientMetadataHandle
template class OperationExecutor<ClientMetadataHandle>;
template class OperationExecutor<MessageHandle>;
template class InfallibleOperationExecutor<ServerMetadataHandle>;
} // namespace filters_detail
@ -285,7 +231,7 @@ RefCountedPtr<CallFilters::Stack> CallFilters::StackBuilder::Build() {
// in the same order
data_.server_initial_metadata.Reverse();
data_.server_to_client_messages.Reverse();
data_.server_trailing_metadata.Reverse();
absl::c_reverse(data_.server_trailing_metadata);
return RefCountedPtr<Stack>(new Stack(std::move(data_)));
}

@ -173,11 +173,10 @@ struct ResultOr {
};
// One filter operation metadata
// Given a value of type V, produces a promise of type R.
template <typename R, typename V>
// Given a value of type T, produces a promise of type ResultOr<T>.
template <typename T>
struct Operator {
using Result = R;
using Arg = V;
using Arg = T;
// Pointer to corresponding channel data for this filter
void* channel_data;
// Offset of the call data for this filter within the call data memory
@ -185,13 +184,13 @@ struct Operator {
// Initialize the promise data for this filter, and poll once.
// Return the result of the poll.
// If the promise finishes, also destroy the promise data!
Poll<R> (*promise_init)(void* promise_data, void* call_data,
void* channel_data, V value);
Poll<ResultOr<T>> (*promise_init)(void* promise_data, void* call_data,
void* channel_data, T value);
// Poll the promise data for this filter.
// If the promise finishes, also destroy the promise data!
// Note that if the promise always finishes on the first poll, then supplying
// this method is unnecessary (as it will never be called).
Poll<R> (*poll)(void* promise_data);
Poll<ResultOr<T>> (*poll)(void* promise_data);
// Destroy the promise data for this filter for an in-progress operation
// before the promise finishes.
// Note that if the promise always finishes on the first poll, then supplying
@ -207,24 +206,19 @@ struct HalfCloseOperator {
void (*half_close)(void* call_data, void* channel_data);
};
void RunHalfClose(absl::Span<const HalfCloseOperator> ops, void* call_data);
struct ServerTrailingMetadataOperator {
// Pointer to corresponding channel data for this filter
void* channel_data;
// Offset of the call data for this filter within the call data memory
size_t call_offset;
ServerMetadataHandle (*server_trailing_metadata)(
void* call_data, void* channel_data, ServerMetadataHandle metadata);
};
// We divide operations into fallible and infallible.
// Fallible operations can fail, and that failure terminates the call.
// Infallible operations cannot fail.
// Fallible operations are used for client initial, and server initial metadata,
// and messages.
// Infallible operations are used for server trailing metadata.
// (This is because server trailing metadata occurs when the call is finished -
// and so we couldn't possibly become more finished - and also because it's the
// preferred representation of failure anyway!)
// An operation that could fail: takes a T argument, produces a ResultOr<T>
template <typename T>
using FallibleOperator = Operator<ResultOr<T>, T>;
// And one that cannot: takes a T argument, produces a T
template <typename T>
using InfallibleOperator = Operator<T, T>;
void RunHalfClose(absl::Span<const HalfCloseOperator> ops, void* call_data);
ServerMetadataHandle RunServerTrailingMetadata(
absl::Span<const ServerTrailingMetadataOperator> ops, void* call_data,
ServerMetadataHandle md);
// One call finalizer
struct Finalizer {
@ -236,19 +230,20 @@ struct Finalizer {
// A layout of operations for a given filter stack
// This includes which operations, how much memory is required, what alignment.
template <typename Op>
template <typename T>
struct Layout {
size_t promise_size = 0;
size_t promise_alignment = 0;
std::vector<Op> ops;
std::vector<Operator<T>> ops;
void Add(size_t filter_promise_size, size_t filter_promise_alignment, Op op) {
void Add(size_t filter_promise_size, size_t filter_promise_alignment,
Operator<T> op) {
promise_size = std::max(promise_size, filter_promise_size);
promise_alignment = std::max(promise_alignment, filter_promise_alignment);
ops.push_back(op);
}
void Reverse() { std::reverse(ops.begin(), ops.end()); }
void Reverse() { absl::c_reverse(ops); }
};
// AddOp and friends
@ -268,16 +263,7 @@ struct AddOpImpl;
template <typename FunctionImpl, FunctionImpl impl, typename FilterType,
typename T>
void AddOp(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
AddOpImpl<FilterType, T, FunctionImpl, impl>::Add(channel_data, call_offset,
to);
}
template <typename FunctionImpl, FunctionImpl impl, typename FilterType,
typename T>
void AddOp(FilterType* channel_data, size_t call_offset,
Layout<InfallibleOperator<T>>& to) {
void AddOp(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
AddOpImpl<FilterType, T, FunctionImpl, impl>::Add(channel_data, call_offset,
to);
}
@ -308,13 +294,73 @@ template <typename FilterType>
void AddHalfClose(FilterType*, size_t, const NoInterceptor*,
std::vector<HalfCloseOperator>&) {}
template <typename FilterType>
void AddServerTrailingMetadata(
FilterType* channel_data, size_t call_offset,
void (FilterType::Call::*)(ServerMetadata&),
std::vector<ServerTrailingMetadataOperator>& to) {
to.push_back(ServerTrailingMetadataOperator{
channel_data, call_offset,
[](void* call_data, void*, ServerMetadataHandle metadata) {
static_cast<typename FilterType::Call*>(call_data)
->OnServerTrailingMetadata(*metadata);
return metadata;
}});
}
template <typename FilterType>
void AddServerTrailingMetadata(
FilterType* channel_data, size_t call_offset,
void (FilterType::Call::*)(ServerMetadata&, FilterType*),
std::vector<ServerTrailingMetadataOperator>& to) {
to.push_back(ServerTrailingMetadataOperator{
channel_data, call_offset,
[](void* call_data, void* channel_data, ServerMetadataHandle metadata) {
static_cast<typename FilterType::Call*>(call_data)
->OnServerTrailingMetadata(*metadata,
static_cast<FilterType*>(channel_data));
return metadata;
}});
}
template <typename FilterType>
void AddServerTrailingMetadata(
FilterType* channel_data, size_t call_offset,
absl::Status (FilterType::Call::*)(ServerMetadata&),
std::vector<ServerTrailingMetadataOperator>& to) {
to.push_back(ServerTrailingMetadataOperator{
channel_data, call_offset,
[](void* call_data, void*, ServerMetadataHandle metadata) {
auto r = static_cast<typename FilterType::Call*>(call_data)
->OnServerTrailingMetadata(*metadata);
if (r.ok()) return metadata;
return CancelledServerMetadataFromStatus(r);
}});
}
template <typename FilterType>
void AddServerTrailingMetadata(
FilterType* channel_data, size_t call_offset,
ServerMetadataHandle (FilterType::Call::*)(ServerMetadataHandle),
std::vector<ServerTrailingMetadataOperator>& to) {
to.push_back(ServerTrailingMetadataOperator{
channel_data, call_offset,
[](void* call_data, void*, ServerMetadataHandle metadata) {
return static_cast<typename FilterType::Call*>(call_data)
->OnServerTrailingMetadata(std::move(metadata));
}});
}
template <typename FilterType>
void AddServerTrailingMetadata(FilterType*, size_t, const NoInterceptor*,
std::vector<ServerTrailingMetadataOperator>&) {}
// const NoInterceptor $EVENT
// These do nothing, and specifically DO NOT add an operation to the layout.
// Supported for fallible & infallible operations.
template <typename FilterType, typename T, const NoInterceptor* which>
struct AddOpImpl<FilterType, T, const NoInterceptor*, which> {
static void Add(FilterType*, size_t, Layout<FallibleOperator<T>>&) {}
static void Add(FilterType*, size_t, Layout<InfallibleOperator<T>>&) {}
static void Add(FilterType*, size_t, Layout<T>&) {}
};
// void $INTERCEPTOR_NAME($VALUE_TYPE&)
@ -322,10 +368,9 @@ template <typename FilterType, typename T,
void (FilterType::Call::*impl)(typename T::element_type&)>
struct AddOpImpl<FilterType, T,
void (FilterType::Call::*)(typename T::element_type&), impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
@ -337,21 +382,6 @@ struct AddOpImpl<FilterType, T,
nullptr,
});
}
static void Add(FilterType* channel_data, size_t call_offset,
Layout<InfallibleOperator<T>>& to) {
to.Add(0, 0,
InfallibleOperator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<T> {
(static_cast<typename FilterType::Call*>(call_data)->*impl)(
*value);
return std::move(value);
},
nullptr,
nullptr,
});
}
};
// void $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*)
@ -361,10 +391,9 @@ template <typename FilterType, typename T,
struct AddOpImpl<
FilterType, T,
void (FilterType::Call::*)(typename T::element_type&, FilterType*), impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -377,33 +406,16 @@ struct AddOpImpl<
nullptr,
});
}
static void Add(FilterType* channel_data, size_t call_offset,
Layout<InfallibleOperator<T>>& to) {
to.Add(
0, 0,
InfallibleOperator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data, T value) -> Poll<T> {
(static_cast<typename FilterType::Call*>(call_data)->*impl)(
*value, static_cast<FilterType*>(channel_data));
return std::move(value);
},
nullptr,
nullptr,
});
}
};
// $VALUE_HANDLE $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*)
template <typename FilterType, typename T,
T (FilterType::Call::*impl)(T, FilterType*)>
struct AddOpImpl<FilterType, T, T (FilterType::Call::*)(T, FilterType*), impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -417,24 +429,6 @@ struct AddOpImpl<FilterType, T, T (FilterType::Call::*)(T, FilterType*), impl> {
nullptr,
});
}
static void Add(FilterType* channel_data, size_t call_offset,
Layout<InfallibleOperator<T>>& to) {
to.Add(
0, 0,
InfallibleOperator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data, T value) -> Poll<T> {
(static_cast<typename FilterType::Call*>(call_data)->*impl)(
*value, static_cast<FilterType*>(channel_data));
return (
static_cast<typename FilterType::Call*>(call_data)->*impl)(
std::move(value), static_cast<FilterType*>(channel_data));
},
nullptr,
nullptr,
});
}
};
// absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&)
@ -443,11 +437,10 @@ template <typename FilterType, typename T,
struct AddOpImpl<FilterType, T,
absl::Status (FilterType::Call::*)(typename T::element_type&),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
@ -462,24 +455,6 @@ struct AddOpImpl<FilterType, T,
nullptr,
});
}
static void Add(FilterType* channel_data, size_t call_offset,
Layout<InfallibleOperator<T>>& to) {
to.Add(
0, 0,
InfallibleOperator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<T> {
auto r =
(static_cast<typename FilterType::Call*>(call_data)->*impl)(
*value);
if (r.ok()) return std::move(value);
return StatusCast<ServerMetadataHandle>(std::move(r));
},
nullptr,
nullptr,
});
}
};
// absl::Status $INTERCEPTOR_NAME(const $VALUE_TYPE&)
@ -489,11 +464,10 @@ template <typename FilterType, typename T,
struct AddOpImpl<
FilterType, T,
absl::Status (FilterType::Call::*)(const typename T::element_type&), impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
@ -518,11 +492,10 @@ struct AddOpImpl<FilterType, T,
absl::Status (FilterType::Call::*)(typename T::element_type&,
FilterType*),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -548,11 +521,10 @@ struct AddOpImpl<FilterType, T,
absl::Status (FilterType::Call::*)(
const typename T::element_type&, FilterType*),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -576,11 +548,10 @@ template <typename FilterType, typename T,
struct AddOpImpl<FilterType, T,
absl::StatusOr<T> (FilterType::Call::*)(T, FilterType*),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -606,11 +577,10 @@ struct AddOpImpl<FilterType, T,
ServerMetadataHandle (FilterType::Call::*)(
typename T::element_type&),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
@ -635,11 +605,10 @@ struct AddOpImpl<FilterType, T,
ServerMetadataHandle (FilterType::Call::*)(
const typename T::element_type&),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
@ -664,11 +633,10 @@ struct AddOpImpl<FilterType, T,
ServerMetadataHandle (FilterType::Call::*)(
typename T::element_type&, FilterType*),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -694,11 +662,10 @@ struct AddOpImpl<FilterType, T,
ServerMetadataHandle (FilterType::Call::*)(
const typename T::element_type&, FilterType*),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -722,8 +689,7 @@ template <typename FilterType, typename T, typename R,
struct AddOpImpl<
FilterType, T, R (FilterType::Call::*)(typename T::element_type&), impl,
absl::enable_if_t<std::is_same<absl::Status, PromiseResult<R>>::value>> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
class Promise {
public:
Promise(T value, typename FilterType::Call* call_data, FilterType*)
@ -746,7 +712,7 @@ struct AddOpImpl<
GPR_NO_UNIQUE_ADDRESS R impl_;
};
to.Add(sizeof(Promise), alignof(Promise),
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void* promise_data, void* call_data, void* channel_data,
@ -775,8 +741,7 @@ struct AddOpImpl<
R (FilterType::Call::*)(typename T::element_type&, FilterType*), impl,
absl::enable_if_t<!std::is_same<R, absl::Status>::value &&
std::is_same<absl::Status, PromiseResult<R>>::value>> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
class Promise {
public:
Promise(T value, typename FilterType::Call* call_data,
@ -801,7 +766,7 @@ struct AddOpImpl<
GPR_NO_UNIQUE_ADDRESS R impl_;
};
to.Add(sizeof(Promise), alignof(Promise),
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void* promise_data, void* call_data, void* channel_data,
@ -829,8 +794,7 @@ template <typename FilterType, typename T, typename R,
struct AddOpImpl<FilterType, T, R (FilterType::Call::*)(T, FilterType*), impl,
absl::enable_if_t<std::is_same<absl::StatusOr<T>,
PromiseResult<R>>::value>> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
class Promise {
public:
Promise(T value, typename FilterType::Call* call_data,
@ -851,7 +815,7 @@ struct AddOpImpl<FilterType, T, R (FilterType::Call::*)(T, FilterType*), impl,
GPR_NO_UNIQUE_ADDRESS R impl_;
};
to.Add(sizeof(Promise), alignof(Promise),
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void* promise_data, void* call_data, void* channel_data,
@ -893,12 +857,12 @@ struct StackData {
// For each kind of operation, a layout of the operations for this call.
// (there's some duplicate data here, and that's ok: we want to avoid
// pointer chasing as much as possible when executing a call)
Layout<FallibleOperator<ClientMetadataHandle>> client_initial_metadata;
Layout<FallibleOperator<ServerMetadataHandle>> server_initial_metadata;
Layout<FallibleOperator<MessageHandle>> client_to_server_messages;
Layout<ClientMetadataHandle> client_initial_metadata;
Layout<ServerMetadataHandle> server_initial_metadata;
Layout<MessageHandle> client_to_server_messages;
std::vector<HalfCloseOperator> client_to_server_half_close;
Layout<FallibleOperator<MessageHandle>> server_to_client_messages;
Layout<InfallibleOperator<ServerMetadataHandle>> server_trailing_metadata;
Layout<MessageHandle> server_to_client_messages;
std::vector<ServerTrailingMetadataOperator> server_trailing_metadata;
// A list of finalizers for this call.
// We use a bespoke data structure here because finalizers can never be
// asynchronous.
@ -1036,9 +1000,9 @@ struct StackData {
template <typename FilterType>
void AddServerTrailingMetadataOp(FilterType* channel_data,
size_t call_offset) {
AddOp<decltype(&FilterType::Call::OnServerTrailingMetadata),
&FilterType::Call::OnServerTrailingMetadata>(
channel_data, call_offset, server_trailing_metadata);
AddServerTrailingMetadata(channel_data, call_offset,
&FilterType::Call::OnServerTrailingMetadata,
server_trailing_metadata);
}
// Finalizer interception adders
@ -1110,8 +1074,7 @@ class OperationExecutor {
// Start executing a layout. May allocate space to store the relevant promise.
// Returns the result of the first poll.
// If the promise finishes, also destroy the promise data.
Poll<ResultOr<T>> Start(const Layout<FallibleOperator<T>>* layout, T input,
void* call_data);
Poll<ResultOr<T>> Start(const Layout<T>* layout, T input, void* call_data);
// Continue executing a layout. Returns the result of the next poll.
// If the promise finishes, also destroy the promise data.
Poll<ResultOr<T>> Step(void* call_data);
@ -1133,63 +1096,8 @@ class OperationExecutor {
Poll<ResultOr<T>> ContinueStep(void* call_data);
void* promise_data_ = nullptr;
const FallibleOperator<T>* ops_;
const FallibleOperator<T>* end_ops_;
};
// Per OperationExecutor, but for infallible operation sequences.
template <typename T>
class InfallibleOperationExecutor {
public:
InfallibleOperationExecutor() = default;
~InfallibleOperationExecutor();
InfallibleOperationExecutor(const InfallibleOperationExecutor&) = delete;
InfallibleOperationExecutor& operator=(const InfallibleOperationExecutor&) =
delete;
InfallibleOperationExecutor(InfallibleOperationExecutor&& other) noexcept
: ops_(other.ops_), end_ops_(other.end_ops_) {
// Movable iff we're not running.
DCHECK_EQ(other.promise_data_, nullptr);
}
InfallibleOperationExecutor& operator=(
InfallibleOperationExecutor&& other) noexcept {
DCHECK_EQ(other.promise_data_, nullptr);
DCHECK_EQ(promise_data_, nullptr);
ops_ = other.ops_;
end_ops_ = other.end_ops_;
return *this;
}
// IsRunning() is true if we're currently executing a sequence of operations.
bool IsRunning() const { return promise_data_ != nullptr; }
// Start executing a layout. May allocate space to store the relevant promise.
// Returns the result of the first poll.
// If the promise finishes, also destroy the promise data.
Poll<T> Start(const Layout<InfallibleOperator<T>>* layout, T input,
void* call_data);
// Continue executing a layout. Returns the result of the next poll.
// If the promise finishes, also destroy the promise data.
Poll<T> Step(void* call_data);
private:
// Start polling on the current step of the layout.
// `input` is the current value (either the input to the first step, or the
// so far transformed value)
// `call_data` is the call data for the filter stack.
// If this op finishes immediately then we iterative move to the next step.
// If we reach the end up the ops, we return the overall poll result,
// otherwise we return Pending.
Poll<T> InitStep(T input, void* call_data);
// Continue polling on the current step of the layout.
// Called on the next poll after InitStep returns pending.
// If the promise is still pending, returns this.
// If the promise completes we call into InitStep to continue execution
// through the filters.
Poll<T> ContinueStep(void* call_data);
void* promise_data_ = nullptr;
const InfallibleOperator<T>* ops_;
const InfallibleOperator<T>* end_ops_;
const Operator<T>* ops_;
const Operator<T>* end_ops_;
};
template <typename Fn>
@ -1383,8 +1291,7 @@ class CallFilters {
template <typename Output, typename Input,
Input(CallFilters::*input_location),
filters_detail::Layout<filters_detail::FallibleOperator<Input>>(
filters_detail::StackData::*layout),
filters_detail::Layout<Input>(filters_detail::StackData::*layout),
void (CallState::*on_done)()>
auto RunExecutor() {
DCHECK_NE((this->*input_location).get(), nullptr);
@ -1513,33 +1420,15 @@ class CallFilters {
// Client: Fetch server trailing metadata
// Returns a promise that resolves to ServerMetadataHandle
GRPC_MUST_USE_RESULT auto PullServerTrailingMetadata() {
return Seq(
return Map(
[this]() { return call_state_.PollServerTrailingMetadataAvailable(); },
[this](Empty) {
filters_detail::InfallibleOperationExecutor<ServerMetadataHandle>
executor;
return [this, executor = std::move(
executor)]() mutable -> Poll<ServerMetadataHandle> {
auto finish_step = [this](Poll<ServerMetadataHandle> p)
-> Poll<ServerMetadataHandle> {
auto* r = p.value_if_ready();
if (r == nullptr) return Pending{};
call_state_.FinishPullServerTrailingMetadata();
return std::move(*r);
};
if (push_server_trailing_metadata_ != nullptr) {
// If no stack has been set, we can just return the result of the
// call
if (stack_ == nullptr) {
call_state_.FinishPullServerTrailingMetadata();
return std::move(push_server_trailing_metadata_);
}
return finish_step(executor.Start(
&(stack_->data_.server_trailing_metadata),
std::move(push_server_trailing_metadata_), call_data_));
}
return finish_step(executor.Step(call_data_));
};
auto result = std::move(push_server_trailing_metadata_);
call_state_.FinishPullServerTrailingMetadata();
if (call_data_ == nullptr) return result;
return filters_detail::RunServerTrailingMetadata(
stack_->data_.server_trailing_metadata, call_data_,
std::move(result));
});
}
// Server: Wait for server trailing metadata to have been sent

@ -44,18 +44,6 @@ void gpr_unreachable_code(const char* reason, const char* file, int line) {
grpc_core::SourceLocation(file, line));
}
const char* gpr_log_severity_string(gpr_log_severity severity) {
switch (severity) {
case GPR_LOG_SEVERITY_DEBUG:
return "D";
case GPR_LOG_SEVERITY_INFO:
return "I";
case GPR_LOG_SEVERITY_ERROR:
return "E";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
int gpr_should_log(gpr_log_severity severity) {
switch (severity) {
case GPR_LOG_SEVERITY_ERROR:

@ -47,7 +47,7 @@ void RegisterOpenCensusPlugin() {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL,
&grpc::internal::OpenCensusClientFilter::kFilter)
.Before({&grpc_core::ClientLoggingFilter::kFilter});
.Before<grpc_core::ClientLoggingFilter>();
});
// Access measures to ensure they are initialized. Otherwise, creating a view

@ -18,6 +18,7 @@ from typing import Generic, Iterable, Mapping, NoReturn, Optional, Sequence
import grpc
from ._metadata import Metadata # pylint: disable=unused-import
from ._typing import DoneCallbackType
from ._typing import MetadataType
from ._typing import RequestType

@ -253,7 +253,6 @@ gpr_malloc_aligned_type gpr_malloc_aligned_import;
gpr_free_aligned_type gpr_free_aligned_import;
gpr_cpu_num_cores_type gpr_cpu_num_cores_import;
gpr_cpu_current_cpu_type gpr_cpu_current_cpu_import;
gpr_log_severity_string_type gpr_log_severity_string_import;
gpr_log_type gpr_log_import;
gpr_should_log_type gpr_should_log_import;
gpr_log_message_type gpr_log_message_import;
@ -542,7 +541,6 @@ void grpc_rb_load_imports(HMODULE library) {
gpr_free_aligned_import = (gpr_free_aligned_type) GetProcAddress(library, "gpr_free_aligned");
gpr_cpu_num_cores_import = (gpr_cpu_num_cores_type) GetProcAddress(library, "gpr_cpu_num_cores");
gpr_cpu_current_cpu_import = (gpr_cpu_current_cpu_type) GetProcAddress(library, "gpr_cpu_current_cpu");
gpr_log_severity_string_import = (gpr_log_severity_string_type) GetProcAddress(library, "gpr_log_severity_string");
gpr_log_import = (gpr_log_type) GetProcAddress(library, "gpr_log");
gpr_should_log_import = (gpr_should_log_type) GetProcAddress(library, "gpr_should_log");
gpr_log_message_import = (gpr_log_message_type) GetProcAddress(library, "gpr_log_message");

@ -735,9 +735,6 @@ extern gpr_cpu_num_cores_type gpr_cpu_num_cores_import;
typedef unsigned(*gpr_cpu_current_cpu_type)(void);
extern gpr_cpu_current_cpu_type gpr_cpu_current_cpu_import;
#define gpr_cpu_current_cpu gpr_cpu_current_cpu_import
typedef const char*(*gpr_log_severity_string_type)(gpr_log_severity severity);
extern gpr_log_severity_string_type gpr_log_severity_string_import;
#define gpr_log_severity_string gpr_log_severity_string_import
typedef void(*gpr_log_type)(const char* file, int line, gpr_log_severity severity, const char* format, ...) GPR_PRINT_FORMAT_CHECK(4, 5);
extern gpr_log_type gpr_log_import;
#define gpr_log gpr_log_import

@ -14,7 +14,7 @@
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
load("//test/core/call/yodel:grpc_yodel_test.bzl", "grpc_yodel_simple_test")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_benchmark_args")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_cc_benchmark")
grpc_package(name = "test/core/call")
@ -79,17 +79,9 @@ grpc_cc_library(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_client_call",
size = "small",
srcs = ["bm_client_call.cc"],
args = grpc_benchmark_args(),
external_deps = ["benchmark"],
tags = [
"no_mac",
"no_windows",
],
uses_polling = False,
deps = [
"//:grpc",
"//src/core:default_event_engine",

@ -14,7 +14,7 @@
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
load("//test/core/call/yodel:grpc_yodel_test.bzl", "grpc_yodel_simple_test")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_benchmark_args")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_cc_benchmark")
grpc_package(name = "test/core/client_channel")
@ -118,17 +118,9 @@ grpc_cc_test(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_client_channel",
size = "small",
srcs = ["bm_client_channel.cc"],
args = grpc_benchmark_args(),
external_deps = ["benchmark"],
tags = [
"no_mac",
"no_windows",
],
uses_polling = False,
deps = [
"//:grpc",
"//src/core:default_event_engine",

@ -13,6 +13,7 @@
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_cc_benchmark")
licenses(["notice"])
@ -109,20 +110,13 @@ grpc_cc_test(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "lock_free_event_test",
srcs = ["lock_free_event_test.cc"],
external_deps = [
"benchmark",
"gtest",
],
language = "C++",
tags = [
"no_mac",
"no_windows",
],
uses_event_engine = True,
uses_polling = False,
deps = [
"//src/core:posix_event_engine",
"//src/core:posix_event_engine_closure",

@ -13,6 +13,7 @@
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_cc_benchmark")
grpc_package(name = "test/core/experiments")
@ -31,16 +32,10 @@ grpc_cc_library(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_experiments",
srcs = ["bm_experiments.cc"],
external_deps = [
"benchmark",
"absl/container:btree",
],
deps = [
"//:grpc++",
"//src/core:channel_args",
"//src/core:experiments",
"//test/core/test_util:grpc_test_util",
],

@ -13,7 +13,7 @@
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_benchmark_args")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_cc_benchmark")
licenses(["notice"])
@ -120,13 +120,9 @@ grpc_cc_test(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_http_client_filter",
size = "small",
srcs = ["bm_http_client_filter.cc"],
args = grpc_benchmark_args(),
external_deps = ["benchmark"],
uses_polling = False,
deps = [
"//:grpc",
"//src/core:default_event_engine",

@ -415,6 +415,7 @@ grpc_cc_test(
external_deps = [
"gtest",
"absl/strings:str_format",
"absl/container:flat_hash_map",
],
language = "c++",
uses_event_engine = False,

@ -17,10 +17,14 @@
#include <iosfwd>
#include <map>
#include "absl/container/flat_hash_map.h"
#include "absl/strings/str_format.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
using testing::Pair;
using testing::UnorderedElementsAre;
namespace grpc_core {
// Teach gtest to print names usefully.
@ -113,6 +117,23 @@ TEST(UniqueTypeNameTest, UniqueTypeNameHere) {
EXPECT_NE(name1, name2);
}
TEST(UniqueTypenameTest, Stringify) {
auto name = GRPC_UNIQUE_TYPE_NAME_HERE("bob");
EXPECT_EQ("bob", absl::StrFormat("%v", name));
}
TEST(UniqueTypeNameTest, Hash) {
auto name1 = GRPC_UNIQUE_TYPE_NAME_HERE("name");
auto name2 = GRPC_UNIQUE_TYPE_NAME_HERE("name");
auto name3 = GRPC_UNIQUE_TYPE_NAME_HERE("other");
absl::flat_hash_map<UniqueTypeName, int> m;
m[name1] = 1;
m[name2] = 2;
m[name3] = 3;
EXPECT_THAT(
m, UnorderedElementsAre(Pair(name1, 1), Pair(name2, 2), Pair(name3, 3)));
}
} // namespace
} // namespace grpc_core

@ -18,6 +18,7 @@ load(
"grpc_cc_test",
"grpc_package",
)
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_cc_benchmark")
grpc_package(name = "test/core/load_balancing")
@ -194,21 +195,14 @@ grpc_cc_test(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "static_stride_scheduler_benchmark",
srcs = ["static_stride_scheduler_benchmark.cc"],
external_deps = [
"absl/algorithm:container",
"absl/log:check",
"benchmark",
],
language = "C++",
tags = [
"no_mac",
"no_windows",
],
uses_event_engine = False,
uses_polling = False,
deps = [
"//src/core:no_destruct",
"//src/core:static_stride_scheduler",

@ -14,7 +14,7 @@
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
load("//test/core/test_util:grpc_fuzzer.bzl", "grpc_proto_fuzzer")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_benchmark_args")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_cc_benchmark")
licenses(["notice"])
@ -686,17 +686,9 @@ grpc_cc_test(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_party",
size = "small",
srcs = ["bm_party.cc"],
args = grpc_benchmark_args(),
external_deps = ["benchmark"],
tags = [
"no_mac",
"no_windows",
],
uses_polling = False,
deps = [
"//:grpc",
"//src/core:1999",

@ -99,7 +99,7 @@ TEST(ChannelInitTest, AfterConstraintsApply) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("bar"))
.After({FilterNamed("foo")});
.After({FilterNamed("foo")->name});
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("baz"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("aaa")).Terminal();
auto init = b.Build();
@ -110,7 +110,7 @@ TEST(ChannelInitTest, AfterConstraintsApply) {
TEST(ChannelInitTest, BeforeConstraintsApply) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo"))
.Before({FilterNamed("bar")});
.Before({FilterNamed("bar")->name});
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("bar"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("baz"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("aaa")).Terminal();
@ -210,10 +210,71 @@ TEST(ChannelInitTest, CanPostProcessFilters) {
std::vector<std::string>({"foo", "aaa", "bar"}));
}
TEST(ChannelInitTest, OrderingConstraintsAreSatisfied) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("c")).FloatToTop();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("b"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("a")).SinkToBottom();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("terminator")).Terminal();
EXPECT_EQ(GetFilterNames(b.Build(), GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"c", "b", "a", "terminator"}));
}
TEST(ChannelInitTest, AmbiguousTopCrashes) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("c")).FloatToTop();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("b")).FloatToTop();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("terminator")).Terminal();
EXPECT_DEATH_IF_SUPPORTED(b.Build(), "Ambiguous");
}
TEST(ChannelInitTest, ExplicitOrderingBetweenTopResolvesAmbiguity) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("c")).FloatToTop();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("b"))
.FloatToTop()
.After({FilterNamed("c")->name});
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("terminator")).Terminal();
EXPECT_EQ(GetFilterNames(b.Build(), GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"c", "b", "terminator"}));
}
TEST(ChannelInitTest, AmbiguousBottomCrashes) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("c")).SinkToBottom();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("b")).SinkToBottom();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("terminator")).Terminal();
EXPECT_DEATH_IF_SUPPORTED(b.Build(), "Ambiguous");
}
TEST(ChannelInitTest, ExplicitOrderingBetweenBottomResolvesAmbiguity) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("c")).SinkToBottom();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("b"))
.SinkToBottom()
.After({FilterNamed("c")->name});
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("terminator")).Terminal();
EXPECT_EQ(GetFilterNames(b.Build(), GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"c", "b", "terminator"}));
}
TEST(ChannelInitTest, BottomCanComeBeforeTopWithExplicitOrdering) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("c")).FloatToTop();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("b"))
.SinkToBottom()
.Before({FilterNamed("c")->name});
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("terminator")).Terminal();
EXPECT_EQ(GetFilterNames(b.Build(), GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"b", "c", "terminator"}));
}
class TestFilter1 {
public:
explicit TestFilter1(int* p) : p_(p) {}
static absl::string_view TypeName() { return "TestFilter1"; }
static absl::StatusOr<std::unique_ptr<TestFilter1>> Create(
const ChannelArgs& args, ChannelFilter::Args) {
EXPECT_EQ(args.GetInt("foo"), 1);

@ -14,7 +14,7 @@
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
load("//test/core/call/yodel:grpc_yodel_test.bzl", "grpc_yodel_simple_test")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_benchmark_args")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_cc_benchmark")
licenses(["notice"])
@ -241,17 +241,9 @@ grpc_cc_library(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_call_spine",
size = "small",
srcs = ["bm_call_spine.cc"],
args = grpc_benchmark_args(),
external_deps = ["benchmark"],
tags = [
"no_mac",
"no_windows",
],
uses_polling = False,
deps = [
":call_spine_benchmarks",
"//:grpc",

@ -72,17 +72,16 @@ class MockActivity : public Activity, public Wakeable {
namespace filters_detail {
TEST(LayoutTest, Empty) {
Layout<FallibleOperator<ClientMetadataHandle>> l;
Layout<ClientMetadataHandle> l;
ASSERT_EQ(l.ops.size(), 0u);
EXPECT_EQ(l.promise_size, 0u);
EXPECT_EQ(l.promise_alignment, 0u);
}
TEST(LayoutTest, Add) {
Layout<FallibleOperator<ClientMetadataHandle>> l;
Layout<ClientMetadataHandle> l;
l.Add(1, 4,
FallibleOperator<ClientMetadataHandle>{&l, 120, nullptr, nullptr,
nullptr});
Operator<ClientMetadataHandle>{&l, 120, nullptr, nullptr, nullptr});
ASSERT_EQ(l.ops.size(), 1u);
EXPECT_EQ(l.promise_size, 1u);
EXPECT_EQ(l.promise_alignment, 4u);
@ -872,7 +871,7 @@ TEST(StackDataTest, InstantServerToClientMessagesReturningVoid) {
EXPECT_EQ(r.value().ok->flags(), 1u);
}
TEST(StackDataTest, InstantServerTrailingMetadataReturningVoid) {
TEST(StackDataTest, ServerTrailingMetadataReturningVoid) {
struct Filter1 {
struct Call {
void OnServerTrailingMetadata(ServerMetadata& md) {
@ -888,27 +887,20 @@ TEST(StackDataTest, InstantServerTrailingMetadataReturningVoid) {
d.AddServerTrailingMetadataOp(&f1, call_offset);
ASSERT_EQ(d.filter_constructor.size(), 0u);
ASSERT_EQ(d.filter_destructor.size(), 0u);
ASSERT_EQ(d.server_trailing_metadata.ops.size(), 1u);
EXPECT_EQ(d.server_trailing_metadata.ops[0].call_offset, call_offset);
EXPECT_EQ(d.server_trailing_metadata.ops[0].channel_data, &f1);
// Instant => no poll/early destroy
EXPECT_EQ(d.server_trailing_metadata.ops[0].poll, nullptr);
EXPECT_EQ(d.server_trailing_metadata.ops[0].early_destroy, nullptr);
// Check promise init
ASSERT_EQ(d.server_trailing_metadata.size(), 1u);
EXPECT_EQ(d.server_trailing_metadata[0].call_offset, call_offset);
EXPECT_EQ(d.server_trailing_metadata[0].channel_data, &f1);
// Check operation
auto arena = SimpleArenaAllocator()->MakeArena();
auto md = Arena::MakePooled<ServerMetadata>();
EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
char call_data;
auto r = d.server_trailing_metadata.ops[0].promise_init(
nullptr, &call_data, d.server_trailing_metadata.ops[0].channel_data,
std::move(md));
EXPECT_TRUE(r.ready());
EXPECT_EQ(r.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"hello");
auto r = d.server_trailing_metadata[0].server_trailing_metadata(
&call_data, d.server_trailing_metadata[0].channel_data, std::move(md));
EXPECT_EQ(r->get_pointer(HttpPathMetadata())->as_string_view(), "hello");
}
TEST(StackDataTest,
InstantServerTrailingMetadataReturningVoidTakingChannelPtr) {
TEST(StackDataTest, ServerTrailingMetadataReturningVoidTakingChannelPtr) {
struct Filter1 {
struct Call {
void OnServerTrailingMetadata(ServerMetadata& md, Filter1* p) {
@ -926,23 +918,17 @@ TEST(StackDataTest,
d.AddServerTrailingMetadataOp(&f1, call_offset);
ASSERT_EQ(d.filter_constructor.size(), 0u);
ASSERT_EQ(d.filter_destructor.size(), 0u);
ASSERT_EQ(d.server_trailing_metadata.ops.size(), 1u);
EXPECT_EQ(d.server_trailing_metadata.ops[0].call_offset, call_offset);
EXPECT_EQ(d.server_trailing_metadata.ops[0].channel_data, &f1);
// Instant => no poll/early destroy
EXPECT_EQ(d.server_trailing_metadata.ops[0].poll, nullptr);
EXPECT_EQ(d.server_trailing_metadata.ops[0].early_destroy, nullptr);
// Check promise init
ASSERT_EQ(d.server_trailing_metadata.size(), 1u);
EXPECT_EQ(d.server_trailing_metadata[0].call_offset, call_offset);
EXPECT_EQ(d.server_trailing_metadata[0].channel_data, &f1);
// Check operation
auto arena = SimpleArenaAllocator()->MakeArena();
auto md = Arena::MakePooled<ServerMetadata>();
EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
char call_data;
auto r = d.server_trailing_metadata.ops[0].promise_init(
nullptr, &call_data, d.server_trailing_metadata.ops[0].channel_data,
std::move(md));
EXPECT_TRUE(r.ready());
EXPECT_EQ(r.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"hello");
auto r = d.server_trailing_metadata[0].server_trailing_metadata(
&call_data, d.server_trailing_metadata[0].channel_data, std::move(md));
EXPECT_EQ(r->get_pointer(HttpPathMetadata())->as_string_view(), "hello");
EXPECT_THAT(f1.v, ::testing::ElementsAre(42));
}
@ -964,13 +950,13 @@ TEST(StackBuilderTest, AddOnServerTrailingMetadata) {
[x = std::make_unique<int>(42)](ServerMetadata&) { EXPECT_EQ(*x, 42); });
auto stack = b.Build();
const auto& data = CallFilters::StackTestSpouse().StackDataFrom(*stack);
ASSERT_EQ(data.server_trailing_metadata.ops.size(), 1u);
ASSERT_EQ(data.server_trailing_metadata.size(), 1u);
ASSERT_EQ(data.client_initial_metadata.ops.size(), 0u);
ASSERT_EQ(data.client_to_server_messages.ops.size(), 0u);
ASSERT_EQ(data.server_to_client_messages.ops.size(), 0u);
ASSERT_EQ(data.server_initial_metadata.ops.size(), 0u);
EXPECT_EQ(data.server_trailing_metadata.ops[0].call_offset, 0);
EXPECT_NE(data.server_trailing_metadata.ops[0].channel_data, nullptr);
EXPECT_EQ(data.server_trailing_metadata[0].call_offset, 0);
EXPECT_NE(data.server_trailing_metadata[0].channel_data, nullptr);
}
///////////////////////////////////////////////////////////////////////////////
@ -1118,56 +1104,6 @@ TEST(OperationExecutorTest, PromiseTwo) {
} // namespace filters_detail
///////////////////////////////////////////////////////////////////////////////
// InfallibleOperationExecutor
namespace filters_detail {
TEST(InfallibleOperationExecutor, NoOp) {
OperationExecutor<ServerMetadataHandle> pipe;
EXPECT_FALSE(pipe.IsRunning());
}
TEST(InfallibleOperationExecutor, InstantTwo) {
class Filter1 {
public:
class Call {
public:
void OnServerTrailingMetadata(ClientMetadata& md) {
if (md.get_pointer(HttpPathMetadata()) != nullptr) {
md.Set(HttpPathMetadata(), Slice::FromStaticString("world"));
} else {
md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
}
}
};
};
StackData d;
Filter1 f1;
Filter1 f2;
const size_t call_offset1 = d.AddFilter(&f1);
const size_t call_offset2 = d.AddFilter(&f2);
d.AddServerTrailingMetadataOp(&f1, call_offset1);
d.AddServerTrailingMetadataOp(&f2, call_offset2);
ASSERT_EQ(d.filter_constructor.size(), 0u);
ASSERT_EQ(d.filter_destructor.size(), 0u);
ASSERT_EQ(d.server_trailing_metadata.ops.size(), 2u);
void* call_data = gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
InfallibleOperationExecutor<ServerMetadataHandle> transformer;
auto arena = SimpleArenaAllocator()->MakeArena();
promise_detail::Context<Arena> ctx(arena.get());
auto md = Arena::MakePooled<ServerMetadata>();
EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
auto r =
transformer.Start(&d.server_trailing_metadata, std::move(md), call_data);
EXPECT_TRUE(r.ready());
EXPECT_EQ(r.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"world");
gpr_free_aligned(call_data);
}
} // namespace filters_detail
///////////////////////////////////////////////////////////////////////////////
// CallFilters

@ -359,9 +359,11 @@ class UnstartedCallDestinationFixture {
} // namespace grpc_core
#define GRPC_CALL_SPINE_BENCHMARK(Fixture) \
BENCHMARK(grpc_core::BM_UnaryWithSpawnPerEnd<Fixture>); \
BENCHMARK(grpc_core::BM_UnaryWithSpawnPerOp<Fixture>); \
BENCHMARK(grpc_core::BM_ClientToServerStreaming<Fixture>)
// Declare all relevant benchmarks for a given fixture
// Must be called within the grpc_core namespace
#define GRPC_CALL_SPINE_BENCHMARK(Fixture) \
BENCHMARK(BM_UnaryWithSpawnPerEnd<Fixture>); \
BENCHMARK(BM_UnaryWithSpawnPerOp<Fixture>); \
BENCHMARK(BM_ClientToServerStreaming<Fixture>)
#endif // GRPC_TEST_CORE_TRANSPORT_CALL_SPINE_BENCHMARKS_H

@ -12,27 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_benchmark_args")
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_package")
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_cc_benchmark")
licenses(["notice"])
grpc_package(name = "test/cpp/microbenchmarks")
grpc_cc_test(
grpc_cc_benchmark(
name = "noop-benchmark",
srcs = ["noop-benchmark.cc"],
external_deps = [
"benchmark",
],
deps = ["//test/core/test_util:grpc_test_util"],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_channel_args",
srcs = ["bm_channel_args.cc"],
external_deps = [
"benchmark",
"absl/container:btree",
],
deps = [
@ -42,11 +38,10 @@ grpc_cc_test(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_rng",
srcs = ["bm_rng.cc"],
external_deps = [
"benchmark",
"absl/container:btree",
],
deps = [
@ -56,46 +51,33 @@ grpc_cc_test(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_exec_ctx",
srcs = ["bm_exec_ctx.cc"],
args = grpc_benchmark_args(),
external_deps = [
"benchmark",
],
uses_event_engine = False,
uses_polling = False,
deps = [":helpers"],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_event_engine_run",
size = "small",
srcs = ["bm_event_engine_run.cc"],
args = grpc_benchmark_args(),
external_deps = [
"absl/log:check",
"absl/debugging:leak_check",
"benchmark",
],
uses_polling = False,
deps = [
":helpers",
"//src/core:common_event_engine_closures",
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_thread_pool",
size = "small",
srcs = ["bm_thread_pool.cc"],
args = grpc_benchmark_args(),
external_deps = [
"absl/log:check",
"benchmark",
],
uses_event_engine = False,
uses_polling = False,
deps = [
":helpers",
"//src/core:common_event_engine_closures",
@ -147,27 +129,19 @@ grpc_cc_library(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_closure",
srcs = ["bm_closure.cc"],
args = grpc_benchmark_args(),
tags = [
"no_mac",
"no_windows",
],
deps = [
":helpers",
"//src/core:closure",
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_huffman_decode",
srcs = ["bm_huffman_decode.cc"],
args = grpc_benchmark_args(),
tags = [
"no_mac",
"no_windows",
"nomsan",
"notsan",
"noubsan",
@ -178,36 +152,25 @@ grpc_cc_test(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_alarm",
srcs = ["bm_alarm.cc"],
args = grpc_benchmark_args(),
tags = [
"no_mac",
"no_windows",
],
deps = [":helpers"],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_arena",
size = "large",
srcs = ["bm_arena.cc"],
args = grpc_benchmark_args(),
tags = [
"no_mac",
"no_windows",
"notsan",
],
uses_event_engine = False,
uses_polling = False,
deps = [":helpers"],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_byte_buffer",
srcs = ["bm_byte_buffer.cc"],
args = grpc_benchmark_args(),
external_deps = [
"absl/log:check",
],
@ -216,50 +179,36 @@ grpc_cc_test(
"no_windows",
],
uses_event_engine = False,
uses_polling = False,
deps = [":helpers"],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_channel",
srcs = ["bm_channel.cc"],
args = grpc_benchmark_args(),
tags = [
"no_mac",
"no_windows",
],
uses_event_engine = False,
uses_polling = False,
deps = [":helpers"],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_cq",
srcs = ["bm_cq.cc"],
args = grpc_benchmark_args(),
external_deps = [
"absl/log:check",
],
tags = [
"no_mac",
"no_windows",
],
deps = [":helpers"],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_cq_multiple_threads",
srcs = ["bm_cq_multiple_threads.cc"],
args = grpc_benchmark_args(),
external_deps = [
"absl/log:check",
],
tags = [
"no_mac",
"no_windows",
],
uses_event_engine = False,
uses_polling = False,
deps = [":helpers"],
)
@ -279,18 +228,13 @@ grpc_cc_library(
deps = [":helpers"],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_fullstack_streaming_ping_pong",
size = "large",
srcs = [
"bm_fullstack_streaming_ping_pong.cc",
],
args = grpc_benchmark_args(),
flaky = True,
tags = [
"no_mac", # to emulate "excluded_poll_engines: poll"
"no_windows",
],
deps = [":fullstack_streaming_ping_pong_h"],
)
@ -306,16 +250,11 @@ grpc_cc_library(
deps = [":helpers"],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_fullstack_streaming_pump",
srcs = [
"bm_fullstack_streaming_pump.cc",
],
args = grpc_benchmark_args(),
tags = [
"no_mac", # to emulate "excluded_poll_engines: poll"
"no_windows",
],
deps = [":fullstack_streaming_pump_h"],
)
@ -331,61 +270,42 @@ grpc_cc_library(
deps = [":helpers_secure"],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_fullstack_unary_ping_pong",
size = "large",
srcs = [
"bm_fullstack_unary_ping_pong.cc",
],
args = grpc_benchmark_args(),
tags = [
"no_mac", # to emulate "excluded_poll_engines: poll"
"no_windows",
],
deps = [":fullstack_unary_ping_pong_h"],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_fullstack_unary_ping_pong_chaotic_good",
size = "large",
srcs = [
"bm_fullstack_unary_ping_pong_chaotic_good.cc",
],
args = grpc_benchmark_args(),
tags = [
"no_mac", # to emulate "excluded_poll_engines: poll"
"no_windows",
],
deps = [
":fullstack_unary_ping_pong_h",
"//:grpcpp_chaotic_good",
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_chttp2_hpack",
srcs = ["bm_chttp2_hpack.cc"],
args = grpc_benchmark_args(),
external_deps = [
"absl/log:check",
],
tags = [
"no_mac",
"no_windows",
],
uses_event_engine = False,
uses_polling = False,
deps = [
":helpers",
"//src/core:slice",
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_opencensus_plugin",
srcs = ["bm_opencensus_plugin.cc"],
args = grpc_benchmark_args(),
language = "C++",
deps = [
":helpers_secure",
"//:grpc_opencensus_plugin",
@ -424,17 +344,14 @@ grpc_cc_library(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_callback_unary_ping_pong",
size = "large",
srcs = [
"bm_callback_unary_ping_pong.cc",
],
args = grpc_benchmark_args(),
tags = [
"manual",
"no_mac",
"no_windows",
"notap",
],
deps = [":callback_unary_ping_pong_h"],
@ -455,38 +372,29 @@ grpc_cc_library(
],
)
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_callback_streaming_ping_pong",
size = "large",
srcs = [
"bm_callback_streaming_ping_pong.cc",
],
args = grpc_benchmark_args(),
tags = [
"manual",
"no_mac",
"no_windows",
"notap",
],
deps = [":callback_streaming_ping_pong_h"],
)
# TODO(hork): Generalize this for other work queue implementations
grpc_cc_test(
grpc_cc_benchmark(
name = "bm_basic_work_queue",
srcs = ["bm_basic_work_queue.cc"],
args = grpc_benchmark_args(),
external_deps = [
"absl/log:check",
"benchmark",
],
tags = [
"manual",
"no_windows",
"notap",
],
uses_event_engine = False,
uses_polling = False,
deps = [
"//:gpr",
"//src/core:common_event_engine_closures",

@ -13,6 +13,43 @@
# limitations under the License.
"""Configuration macros for grpc microbenchmarking"""
load("//bazel:grpc_build_system.bzl", "grpc_cc_test")
def grpc_benchmark_args():
"""Command line arguments for running a microbenchmark as a test"""
return ["--benchmark_min_time=0.001s"]
def grpc_cc_benchmark(name, external_deps = [], tags = [], uses_polling = False, uses_event_engine = False, **kwargs):
"""Base rule for gRPC benchmarks.
This is an opinionated configuration for gRPC benchmarks.
We disable uses_polling, uses_event_engine by default so that we minimize
unnecessary uses of CI time.
Similarly, we disable running on Windows, Mac to save testing time there
(our principle target for performance work is Linux).
linkstatic is enabled always: this is the configuration real binaries use, and
it affects performance, so we should use it on our benchmarks too!
Args:
name: base name of the test
external_deps: per grpc_cc_test
tags: per grpc_cc_test
uses_polling: per grpc_cc_test, but defaulted False
uses_event_engine: per grpc_cc_test, but defaulted False
**kwargs: per grpc_cc_test
"""
grpc_cc_test(
name = name,
args = grpc_benchmark_args(),
external_deps = ["benchmark"] + external_deps,
tags = tags + ["no_mac", "no_windows"],
uses_polling = uses_polling,
uses_event_engine = uses_event_engine,
# cc_binary defaults to 1, and we are interested in performance
# for that, so duplicate that setting here.
linkstatic = 1,
**kwargs
)

@ -46,10 +46,10 @@ DOCKERIMAGE_CURRENT_VERSIONS = {
"tools/dockerfile/distribtest/python_dev_bullseye_x64.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_dev_bullseye_x64@sha256:6c15bd5c350dbc206f45fb512418db51db9a3b73cc304d1d28f4df38bad5a702",
"tools/dockerfile/distribtest/python_dev_bullseye_x86.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_dev_bullseye_x86@sha256:0cbb08f31a9d45ccdc41a88e409b215cac69cc1a9b05f456ebd96b398fd8e9ee",
"tools/dockerfile/distribtest/python_dev_centos7_x64.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_dev_centos7_x64@sha256:e32238c1c46a752881c6c165b918cb24bd182e3957dc72c514fdbd3d5c3312ab",
"tools/dockerfile/distribtest/python_dev_fedora38_x64.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_dev_fedora38_x64@sha256:c135480d3fa874766ea0468c29022ef0720605970843beeda33e3b178fe95ca7",
"tools/dockerfile/distribtest/python_dev_fedora39_x64.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_dev_fedora39_x64@sha256:19e636a63b7d812a0a70b3334ab45ad1593ace4303268eef42c2155770ef08d3",
"tools/dockerfile/distribtest/python_dev_ubuntu2004_x64.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_dev_ubuntu2004_x64@sha256:cdae114cff8a07ea5fe8738a5cf786075c78ae2bda886e3cc59bcd4890eb78ee",
"tools/dockerfile/distribtest/python_dev_ubuntu2204_x64.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_dev_ubuntu2204_x64@sha256:4fc253b24afa128d6295c4e1ac71ea4f8602c7153f4e2e0f38433c14236ec643",
"tools/dockerfile/distribtest/python_fedora38_x64.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_fedora38_x64@sha256:8e632247ef6041550a4e28db96b36eddd8c5f5d18b12e7ca46efe5a18dfb7c41",
"tools/dockerfile/distribtest/python_fedora39_x64.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_fedora39_x64@sha256:f2ad099b0d3553f37077705aaf6d221906fba88f0502e7131024a936ec40f58e",
"tools/dockerfile/distribtest/python_opensuse_x64.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_opensuse_x64@sha256:962e835125423300b800e0448b698de0ff8397819b90c2a8c15fef281619f1b6",
"tools/dockerfile/distribtest/python_python38_buster_aarch64.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_python38_buster_aarch64@sha256:0a93bf2a0303aebe1280bafad69df228b9444af9144c767d8169ecc70fb383f6",
"tools/dockerfile/distribtest/python_ubuntu2004_x64.current_version": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/python_ubuntu2004_x64@sha256:288cf72bc98fc384b9352d1f6d258b3513925ffe5746dda7e2e343723dd5f733",

@ -1 +0,0 @@
us-docker.pkg.dev/grpc-testing/testing-images-public/python_dev_fedora38_x64:f2f07b0a51d7f4d2fee3536335488c0c4405b871@sha256:c135480d3fa874766ea0468c29022ef0720605970843beeda33e3b178fe95ca7

@ -0,0 +1 @@
us-docker.pkg.dev/grpc-testing/testing-images-public/python_dev_fedora39_x64:c10f9536b4ab2d3ee79f4d67ebb60287b3dc4193@sha256:19e636a63b7d812a0a70b3334ab45ad1593ace4303268eef42c2155770ef08d3

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM fedora:38
FROM fedora:39
RUN yum clean all && yum update -y && yum install -y python3 python3-pip
RUN python3 -m pip install virtualenv

@ -1 +0,0 @@
us-docker.pkg.dev/grpc-testing/testing-images-public/python_fedora38_x64:7f4e4684a23266306eb942b42ed479c7db33fe91@sha256:8e632247ef6041550a4e28db96b36eddd8c5f5d18b12e7ca46efe5a18dfb7c41

@ -0,0 +1 @@
us-docker.pkg.dev/grpc-testing/testing-images-public/python_fedora39_x64:5b322d995cbd46dda593e32a32aced42f6b0089e@sha256:f2ad099b0d3553f37077705aaf6d221906fba88f0502e7131024a936ec40f58e

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM fedora:38
FROM fedora:39
RUN yum clean all && yum update -y && yum install -y python3 python3-pip
RUN python3 -m pip install virtualenv

@ -445,7 +445,7 @@ def targets():
# Python
PythonDistribTest("linux", "x64", "bullseye", presubmit=True),
PythonDistribTest("linux", "x86", "bullseye", presubmit=True),
PythonDistribTest("linux", "x64", "fedora38"),
PythonDistribTest("linux", "x64", "fedora39"),
PythonDistribTest("linux", "x64", "arch"),
PythonDistribTest("linux", "x64", "alpine"),
PythonDistribTest("linux", "x64", "ubuntu2204"),
@ -461,7 +461,7 @@ def targets():
PythonDistribTest(
"linux", "x86", "bullseye", source=True, presubmit=True
),
PythonDistribTest("linux", "x64", "fedora38", source=True),
PythonDistribTest("linux", "x64", "fedora39", source=True),
PythonDistribTest("linux", "x64", "arch", source=True),
PythonDistribTest("linux", "x64", "ubuntu2204", source=True),
# Ruby

@ -68,13 +68,13 @@
"uses_polling": false
},
{
"args": [],
"args": [
"--benchmark_min_time=0.001s"
],
"benchmark": true,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
@ -85,11 +85,9 @@
"name": "bm_experiments",
"platforms": [
"linux",
"mac",
"posix",
"windows"
"posix"
],
"uses_polling": true
"uses_polling": false
},
{
"args": [
@ -98,9 +96,7 @@
"benchmark": true,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
@ -111,9 +107,7 @@
"name": "bm_http_client_filter",
"platforms": [
"linux",
"mac",
"posix",
"windows"
"posix"
],
"uses_polling": false
},
@ -208,7 +202,9 @@
"uses_polling": true
},
{
"args": [],
"args": [
"--benchmark_min_time=0.001s"
],
"benchmark": true,
"ci_platforms": [
"linux",
@ -5866,7 +5862,9 @@
"uses_polling": false
},
{
"args": [],
"args": [
"--benchmark_min_time=0.001s"
],
"benchmark": true,
"ci_platforms": [
"linux",

@ -48,10 +48,7 @@ DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = {
"./src/ruby/ext/grpc/rb_grpc_imports.generated.h",
"./test/core/end2end/tests/no_logging.cc",
],
"gpr_log_severity_string(": [
"./include/grpc/support/log.h",
"./src/core/util/log.cc",
],
"gpr_log_severity_string": [],
"gpr_log(": [
"./include/grpc/support/log.h",
"./src/core/ext/filters/backend_metrics/backend_metric_filter.cc",

Loading…
Cancel
Save