[per-cpu] Change up the cpu caching mechanism (#34421)

Lets us sever the dependency between stats & exec ctx (finally).

More work likely needs to go into the *mechanism* used here (I'm not a
fan of the per thread index), but that's also something we can address
later.
pull/34446/head
Craig Tiller 1 year ago committed by GitHub
parent ffdb58dd5f
commit 1dabdfbe6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/BUILD
  2. 2
      src/core/lib/gprpp/per_cpu.cc
  3. 38
      src/core/lib/gprpp/per_cpu.h
  4. 9
      src/core/lib/iomgr/exec_ctx.h
  5. 3
      test/core/ext/filters/event_engine_client_channel_resolver/resolver_fuzzer_corpus/clusterfuzz-testcase-minimized-resolver_fuzzer-5091818350903296.test
  6. 1
      test/cpp/microbenchmarks/bm_cq.cc
  7. 1
      test/cpp/microbenchmarks/bm_event_engine_run.cc
  8. 1
      test/cpp/microbenchmarks/bm_exec_ctx.cc
  9. 3
      test/cpp/microbenchmarks/bm_thread_pool.cc
  10. 2
      test/cpp/microbenchmarks/helpers.cc

@ -2560,7 +2560,6 @@ grpc_cc_library(
],
deps = [
"useful",
"//:exec_ctx",
"//:gpr",
],
)

@ -22,6 +22,8 @@
namespace grpc_core {
thread_local PerCpuShardingHelper::State PerCpuShardingHelper::state_;
size_t PerCpuOptions::Shards() {
return ShardsForCpuCount(gpr_cpu_num_cores());
}

@ -17,12 +17,18 @@
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <algorithm>
#include <cstddef>
#include <limits>
#include <memory>
#include "src/core/lib/iomgr/exec_ctx.h"
#include <grpc/support/cpu.h>
// Sharded collections of objects
// This used to be per-cpu, now it's much less so - but still a way to limit
// contention.
namespace grpc_core {
@ -51,23 +57,39 @@ class PerCpuOptions {
size_t max_shards_ = std::numeric_limits<size_t>::max();
};
class PerCpuShardingHelper {
protected:
size_t GetShardingBits() {
if (GPR_UNLIKELY(state_.uses_until_refresh == 0)) state_ = State();
--state_.uses_until_refresh;
return state_.last_seen_cpu;
}
private:
struct State {
uint16_t last_seen_cpu = gpr_cpu_current_cpu();
uint16_t uses_until_refresh = 65535;
};
static thread_local State state_;
};
template <typename T>
class PerCpu {
class PerCpu : public PerCpuShardingHelper {
public:
// Options are not defaulted to try and force consideration of what the
// options specify.
explicit PerCpu(PerCpuOptions options) : cpus_(options.Shards()) {}
explicit PerCpu(PerCpuOptions options) : shards_(options.Shards()) {}
T& this_cpu() { return data_[ExecCtx::Get()->starting_cpu() % cpus_]; }
T& this_cpu() { return data_[GetShardingBits() % shards_]; }
T* begin() { return data_.get(); }
T* end() { return data_.get() + cpus_; }
T* end() { return data_.get() + shards_; }
const T* begin() const { return data_.get(); }
const T* end() const { return data_.get() + cpus_; }
const T* end() const { return data_.get() + shards_; }
private:
const size_t cpus_;
std::unique_ptr<T[]> data_{new T[cpus_]};
const size_t shards_;
std::unique_ptr<T[]> data_{new T[shards_]};
};
} // namespace grpc_core

@ -133,13 +133,6 @@ class GRPC_DLL ExecCtx {
ExecCtx(const ExecCtx&) = delete;
ExecCtx& operator=(const ExecCtx&) = delete;
unsigned starting_cpu() {
if (starting_cpu_ == std::numeric_limits<unsigned>::max()) {
starting_cpu_ = gpr_cpu_current_cpu();
}
return starting_cpu_;
}
struct CombinerData {
// currently active combiner: updated only via combiner.c
Combiner* active_combiner;
@ -218,8 +211,6 @@ class GRPC_DLL ExecCtx {
CombinerData combiner_data_ = {nullptr, nullptr};
uintptr_t flags_;
unsigned starting_cpu_ = std::numeric_limits<unsigned>::max();
ScopedTimeCache time_cache_;
#if !defined(_WIN32) || !defined(_DLL)

@ -27,6 +27,7 @@
#include <grpcpp/impl/grpc_library.h>
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/surface/completion_queue.h"
#include "test/core/util/test_config.h"
#include "test/cpp/microbenchmarks/helpers.h"

@ -28,6 +28,7 @@
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/notification.h"
#include "test/core/util/test_config.h"
#include "test/cpp/microbenchmarks/helpers.h"

@ -20,6 +20,7 @@
#include <grpcpp/impl/grpc_library.h>
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/util/test_config.h"
#include "test/cpp/microbenchmarks/helpers.h"
#include "test/cpp/util/test_config.h"

@ -21,10 +21,13 @@
#include "absl/strings/str_format.h"
#include <grpc/support/cpu.h>
#include <grpcpp/impl/grpc_library.h>
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/notification.h"
#include "test/core/util/test_config.h"
#include "test/cpp/microbenchmarks/helpers.h"

@ -20,6 +20,8 @@
#include <string.h>
#include <grpc/support/log.h>
static LibraryInitializer* g_libraryInitializer;
LibraryInitializer::LibraryInitializer() {

Loading…
Cancel
Save