Generalize pthread TLS to any trivial type (#27009)

* Avoid copy constructor

```
src/core/lib/gpr/log_linux.cc:78:33: error: copying variable of type 'grpc_core::PthreadTlsImpl<long>' invokes deleted constructor
  static GPR_THREAD_LOCAL(long) tid = 0;
                                ^     ~
./src/core/lib/gpr/tls.h:64:3: note: 'PthreadTlsImpl' has been explicitly marked deleted here
  PthreadTlsImpl(const PthreadTlsImpl&) = delete;
  ^
1 error generated.
```

* Generalize pthread TLS to any trivial type

Use multiple pthread keys for types larger than a machine word.

Implement generic timer TLS optimization on all platforms.
pull/27191/head
Tamir Duberstein 3 years ago committed by GitHub
parent c66d2cc084
commit 76e95f6afd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 64
      CMakeLists.txt
  2. 19
      build_autogenerated.yaml
  3. 2
      src/core/lib/gpr/log_linux.cc
  4. 78
      src/core/lib/gpr/tls.h
  5. 32
      src/core/lib/iomgr/timer_generic.cc
  6. 3
      test/core/gpr/BUILD
  7. 54
      test/core/gpr/tls_test.cc
  8. 48
      tools/run_tests/generated/tests.json

64
CMakeLists.txt generated

@ -718,7 +718,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_c timeout_encoding_test) add_dependencies(buildtests_c timeout_encoding_test)
add_dependencies(buildtests_c timer_heap_test) add_dependencies(buildtests_c timer_heap_test)
add_dependencies(buildtests_c timer_list_test) add_dependencies(buildtests_c timer_list_test)
add_dependencies(buildtests_c tls_test)
add_dependencies(buildtests_c transport_security_common_api_test) add_dependencies(buildtests_c transport_security_common_api_test)
add_dependencies(buildtests_c transport_security_test) add_dependencies(buildtests_c transport_security_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@ -1001,6 +1000,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx time_util_test) add_dependencies(buildtests_cxx time_util_test)
add_dependencies(buildtests_cxx timer_test) add_dependencies(buildtests_cxx timer_test)
add_dependencies(buildtests_cxx tls_security_connector_test) add_dependencies(buildtests_cxx tls_security_connector_test)
add_dependencies(buildtests_cxx tls_test)
add_dependencies(buildtests_cxx too_many_pings_test) add_dependencies(buildtests_cxx too_many_pings_test)
add_dependencies(buildtests_cxx transport_stream_receiver_test) add_dependencies(buildtests_cxx transport_stream_receiver_test)
add_dependencies(buildtests_cxx try_join_test) add_dependencies(buildtests_cxx try_join_test)
@ -7537,33 +7537,6 @@ target_link_libraries(timer_list_test
) )
endif()
if(gRPC_BUILD_TESTS)
add_executable(tls_test
test/core/gpr/tls_test.cc
)
target_include_directories(tls_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(tls_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif() endif()
if(gRPC_BUILD_TESTS) if(gRPC_BUILD_TESTS)
@ -15836,6 +15809,41 @@ target_link_libraries(tls_security_connector_test
) )
endif()
if(gRPC_BUILD_TESTS)
add_executable(tls_test
test/core/gpr/tls_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(tls_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(tls_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif() endif()
if(gRPC_BUILD_TESTS) if(gRPC_BUILD_TESTS)

@ -4151,15 +4151,6 @@ targets:
deps: deps:
- grpc_test_util - grpc_test_util
uses_polling: false uses_polling: false
- name: tls_test
build: test
language: c
headers: []
src:
- test/core/gpr/tls_test.cc
deps:
- grpc_test_util
uses_polling: false
- name: transport_security_common_api_test - name: transport_security_common_api_test
build: test build: test
language: c language: c
@ -7440,6 +7431,16 @@ targets:
- test/core/security/tls_security_connector_test.cc - test/core/security/tls_security_connector_test.cc
deps: deps:
- grpc_test_util - grpc_test_util
- name: tls_test
gtest: true
build: test
language: c++
headers: []
src:
- test/core/gpr/tls_test.cc
deps:
- grpc_test_util
uses_polling: false
- name: too_many_pings_test - name: too_many_pings_test
gtest: true gtest: true
build: test build: test

@ -75,7 +75,7 @@ void gpr_default_log(gpr_log_func_args* args) {
time_t timer; time_t timer;
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
struct tm tm; struct tm tm;
static GPR_THREAD_LOCAL(long) tid = 0; static GPR_THREAD_LOCAL(long) tid(0);
if (tid == 0) tid = sys_gettid(); if (tid == 0) tid = sys_gettid();
timer = static_cast<time_t>(now.tv_sec); timer = static_cast<time_t>(now.tv_sec);

@ -35,15 +35,10 @@ namespace grpc_core {
// This class is never instantiated. It exists to statically ensure that all // This class is never instantiated. It exists to statically ensure that all
// TLS usage is compatible with the most restrictive implementation, allowing // TLS usage is compatible with the most restrictive implementation, allowing
// developers to write correct code regardless of the platform they develop on. // developers to write correct code regardless of the platform they develop on.
template <typename T, typename = typename std::enable_if<( template <typename T>
std::is_trivially_destructible<T>::value &&
sizeof(T) <= sizeof(void*) &&
alignof(void*) % alignof(T) == 0)>::type>
class TlsTypeConstrainer { class TlsTypeConstrainer {
static_assert(std::is_trivially_destructible<T>::value && static_assert(std::is_trivial<T>::value,
sizeof(T) <= sizeof(void*) && "TLS support is limited to trivial types");
alignof(void*) % alignof(T) == 0,
"unsupported type for TLS");
public: public:
using Type = T; using Type = T;
@ -55,6 +50,8 @@ class TlsTypeConstrainer {
#include <grpc/support/log.h> /* for GPR_ASSERT */ #include <grpc/support/log.h> /* for GPR_ASSERT */
#include <pthread.h> #include <pthread.h>
#include <array>
#include <cstring>
namespace grpc_core { namespace grpc_core {
@ -91,50 +88,51 @@ class PthreadTlsImpl : TlsTypeConstrainer<T> {
// //
// https://google.github.io/styleguide/cppguide.html#Static_and_Global_Variables // https://google.github.io/styleguide/cppguide.html#Static_and_Global_Variables
PthreadTlsImpl() PthreadTlsImpl()
: key_([]() { : keys_([]() {
pthread_key_t key; typename std::remove_const<decltype(PthreadTlsImpl::keys_)>::type
keys;
for (pthread_key_t& key : keys) {
GPR_ASSERT(0 == pthread_key_create(&key, nullptr)); GPR_ASSERT(0 == pthread_key_create(&key, nullptr));
return key; }
return keys;
}()) {} }()) {}
PthreadTlsImpl(T t) : PthreadTlsImpl() { *this = t; } PthreadTlsImpl(T t) : PthreadTlsImpl() { *this = t; }
~PthreadTlsImpl() { GPR_ASSERT(0 == pthread_key_delete(key_)); } ~PthreadTlsImpl() {
for (pthread_key_t key : keys_) {
operator T() const { return Cast<T>(pthread_getspecific(key_)); } GPR_ASSERT(0 == pthread_key_delete(key));
T operator=(T t) {
GPR_ASSERT(0 == pthread_setspecific(key_, Cast<T>(t)));
return t;
} }
private:
// TODO(C++17): Replace these helpers with constexpr if statements inline.
template <typename V>
static typename std::enable_if<std::is_pointer<V>::value, V>::type Cast(
void* object) {
return static_cast<V>(object);
} }
template <typename V> operator T() const {
static typename std::enable_if< T t;
!std::is_pointer<V>::value && std::is_integral<V>::value, V>::type char* dst = reinterpret_cast<char*>(&t);
Cast(void* object) { for (pthread_key_t key : keys_) {
return reinterpret_cast<uintptr_t>(object); uintptr_t src = uintptr_t(pthread_getspecific(key));
size_t remaining = reinterpret_cast<char*>(&t + 1) - dst;
size_t step = std::min(sizeof(src), remaining);
memcpy(dst, &src, step);
dst += step;
} }
template <typename V>
static void* Cast(
typename std::enable_if<std::is_pointer<V>::value, V>::type t) {
return t; return t;
} }
template <typename V> T operator=(T t) {
static void* Cast(typename std::enable_if<!std::is_pointer<V>::value && char* src = reinterpret_cast<char*>(&t);
std::is_integral<V>::value, for (pthread_key_t key : keys_) {
V>::type t) { uintptr_t dst;
return reinterpret_cast<void*>(uintptr_t(t)); size_t remaining = reinterpret_cast<char*>(&t + 1) - src;
size_t step = std::min(sizeof(dst), remaining);
memcpy(&dst, src, step);
GPR_ASSERT(0 == pthread_setspecific(key, reinterpret_cast<void*>(dst)));
src += step;
}
return t;
} }
const pthread_key_t key_; private:
const std::array<pthread_key_t,
(sizeof(T) + sizeof(void*) - 1) / sizeof(void*)>
keys_;
}; };
} // namespace grpc_core } // namespace grpc_core

@ -212,19 +212,11 @@ static void validate_non_pending_timer(grpc_timer* t) {
#endif #endif
#if GPR_ARCH_64
/* NOTE: TODO(sreek) - Currently the thread local storage support in grpc is
for intptr_t which means on 32-bit machines it is not wide enough to hold
grpc_millis which is 64-bit. Adding thread local support for 64 bit values
is a lot of work for very little gain. So we are currently restricting this
optimization to only 64 bit machines */
/* Thread local variable that stores the deadline of the next timer the thread /* Thread local variable that stores the deadline of the next timer the thread
* has last-seen. This is an optimization to prevent the thread from checking * has last-seen. This is an optimization to prevent the thread from checking
* shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock, * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
* an expensive operation) */ * an expensive operation) */
static GPR_THREAD_LOCAL(grpc_millis) g_last_seen_min_timer; static GPR_THREAD_LOCAL(grpc_millis) g_last_seen_min_timer;
#endif
struct shared_mutables { struct shared_mutables {
/* The deadline of the next timer due across all timer shards */ /* The deadline of the next timer due across all timer shards */
@ -269,10 +261,7 @@ static void timer_list_init() {
gpr_mu_init(&g_shared_mutables.mu); gpr_mu_init(&g_shared_mutables.mu);
g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now(); g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now();
#if GPR_ARCH_64
g_last_seen_min_timer = 0; g_last_seen_min_timer = 0;
#endif
for (i = 0; i < g_num_shards; i++) { for (i = 0; i < g_num_shards; i++) {
timer_shard* shard = &g_shards[i]; timer_shard* shard = &g_shards[i];
@ -301,11 +290,6 @@ static void timer_list_shutdown() {
grpc_timer_heap_destroy(&shard->heap); grpc_timer_heap_destroy(&shard->heap);
} }
gpr_mu_destroy(&g_shared_mutables.mu); gpr_mu_destroy(&g_shared_mutables.mu);
#if GPR_ARCH_64
#endif
gpr_free(g_shards); gpr_free(g_shards);
gpr_free(g_shard_queue); gpr_free(g_shard_queue);
g_shared_mutables.initialized = false; g_shared_mutables.initialized = false;
@ -452,10 +436,8 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
} }
static void timer_consume_kick(void) { static void timer_consume_kick(void) {
#if GPR_ARCH_64
/* Force re-evaluation of last seen min */ /* Force re-evaluation of last seen min */
g_last_seen_min_timer = 0; g_last_seen_min_timer = 0;
#endif
} }
static void timer_cancel(grpc_timer* timer) { static void timer_cancel(grpc_timer* timer) {
@ -592,7 +574,6 @@ static grpc_timer_check_result run_some_expired_timers(
// safe since we know that both are pointer types and 64-bit wide // safe since we know that both are pointer types and 64-bit wide
grpc_millis min_timer = static_cast<grpc_millis>( grpc_millis min_timer = static_cast<grpc_millis>(
gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer))); gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
g_last_seen_min_timer = min_timer;
#else #else
// On 32-bit systems, gpr_atm_no_barrier_load does not work on 64-bit types // On 32-bit systems, gpr_atm_no_barrier_load does not work on 64-bit types
// (like grpc_millis). So all reads and writes to g_shared_mutables.min_timer // (like grpc_millis). So all reads and writes to g_shared_mutables.min_timer
@ -601,6 +582,8 @@ static grpc_timer_check_result run_some_expired_timers(
grpc_millis min_timer = g_shared_mutables.min_timer; grpc_millis min_timer = g_shared_mutables.min_timer;
gpr_mu_unlock(&g_shared_mutables.mu); gpr_mu_unlock(&g_shared_mutables.mu);
#endif #endif
g_last_seen_min_timer = min_timer;
if (now < min_timer) { if (now < min_timer) {
if (next != nullptr) *next = GPR_MIN(*next, min_timer); if (next != nullptr) *next = GPR_MIN(*next, min_timer);
return GRPC_TIMERS_CHECKED_AND_EMPTY; return GRPC_TIMERS_CHECKED_AND_EMPTY;
@ -676,20 +659,9 @@ static grpc_timer_check_result timer_check(grpc_millis* next) {
// prelude // prelude
grpc_millis now = grpc_core::ExecCtx::Get()->Now(); grpc_millis now = grpc_core::ExecCtx::Get()->Now();
#if GPR_ARCH_64
/* fetch from a thread-local first: this avoids contention on a globally /* fetch from a thread-local first: this avoids contention on a globally
mutable cacheline in the common case */ mutable cacheline in the common case */
grpc_millis min_timer = g_last_seen_min_timer; grpc_millis min_timer = g_last_seen_min_timer;
#else
// On 32-bit systems, we currently do not have thread local support for 64-bit
// types. In this case, directly read from g_shared_mutables.min_timer.
// Also, note that on 32-bit systems, gpr_atm_no_barrier_store does not work
// on 64-bit types (like grpc_millis). So all reads and writes to
// g_shared_mutables.min_timer are done under g_shared_mutables.mu
gpr_mu_lock(&g_shared_mutables.mu);
grpc_millis min_timer = g_shared_mutables.min_timer;
gpr_mu_unlock(&g_shared_mutables.mu);
#endif
if (now < min_timer) { if (now < min_timer) {
if (next != nullptr) { if (next != nullptr) {

@ -134,6 +134,9 @@ grpc_cc_test(
grpc_cc_test( grpc_cc_test(
name = "tls_test", name = "tls_test",
srcs = ["tls_test.cc"], srcs = ["tls_test.cc"],
external_deps = [
"gtest",
],
language = "C++", language = "C++",
uses_polling = False, uses_polling = False,
deps = [ deps = [

@ -20,45 +20,47 @@
#include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/tls.h"
#include <stdio.h> #include <gtest/gtest.h>
#include <stdlib.h> #include <array>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/thd.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
#define NUM_THREADS 100 struct BiggerThanMachineWord {
size_t a, b;
static GPR_THREAD_LOCAL(intptr_t) test_var; uint8_t c;
};
static void thd_body(void* /*arg*/) {
intptr_t i;
GPR_ASSERT(test_var == 0); static GPR_THREAD_LOCAL(BiggerThanMachineWord) test_var;
// Fails to compile: static GPR_THREAD_LOCAL(std::unique_ptr<char>) non_trivial;
for (i = 0; i < 100000; i++) { namespace {
test_var = i; void thd_body(void*) {
GPR_ASSERT(test_var == i); for (size_t i = 0; i < 100000; i++) {
BiggerThanMachineWord next = {i, i, uint8_t(i)};
test_var = next;
BiggerThanMachineWord read = test_var;
ASSERT_EQ(read.a, i);
ASSERT_EQ(read.b, i);
ASSERT_EQ(read.c, uint8_t(i)) << i;
} }
test_var = 0;
} }
/* ------------------------------------------------- */ TEST(ThreadLocal, ReadWrite) {
std::array<grpc_core::Thread, 100> threads;
int main(int argc, char* argv[]) { for (grpc_core::Thread& th : threads) {
grpc_core::Thread threads[NUM_THREADS];
grpc::testing::TestEnvironment env(argc, argv);
for (auto& th : threads) {
th = grpc_core::Thread("grpc_tls_test", thd_body, nullptr); th = grpc_core::Thread("grpc_tls_test", thd_body, nullptr);
th.Start(); th.Start();
} }
for (auto& th : threads) { for (grpc_core::Thread& th : threads) {
th.Join(); th.Join();
} }
}
return 0; } // namespace
int main(int argc, char* argv[]) {
grpc::testing::TestEnvironment env(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
} }

@ -2939,30 +2939,6 @@
], ],
"uses_polling": false "uses_polling": false
}, },
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "tls_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{ {
"args": [], "args": [],
"benchmark": false, "benchmark": false,
@ -7081,6 +7057,30 @@
], ],
"uses_polling": true "uses_polling": true
}, },
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "tls_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{ {
"args": [], "args": [],
"benchmark": false, "benchmark": false,

Loading…
Cancel
Save