@ -39,10 +39,12 @@
# include <atomic>
# include <cassert>
# include <cstdint>
# include "absl/base/internal/malloc_extension.h"
# include "absl/base/internal/raw_logging.h"
# include "absl/base/internal/thread_identity.h"
# include "absl/base/optimization.h"
# include "absl/synchronization/internal/kernel_timeout.h"
namespace absl {
@ -82,6 +84,42 @@ static void MaybeBecomeIdle() {
# define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF
# endif
# endif
class Futex {
public :
static int WaitUntil ( std : : atomic < int32_t > * v , int32_t val ,
KernelTimeout t ) {
int err = 0 ;
if ( t . has_timeout ( ) ) {
// https://locklessinc.com/articles/futex_cheat_sheet/
// Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
struct timespec abs_timeout = t . MakeAbsTimespec ( ) ;
// Atomically check that the futex value is still 0, and if it
// is, sleep until abs_timeout or until woken by FUTEX_WAKE.
err = syscall (
SYS_futex , reinterpret_cast < int32_t * > ( v ) ,
FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME , val ,
& abs_timeout , nullptr , FUTEX_BITSET_MATCH_ANY ) ;
} else {
// Atomically check that the futex value is still 0, and if it
// is, sleep until woken by FUTEX_WAKE.
err = syscall ( SYS_futex , reinterpret_cast < int32_t * > ( v ) ,
FUTEX_WAIT | FUTEX_PRIVATE_FLAG , val , nullptr ) ;
}
if ( err ! = 0 ) {
err = - errno ;
}
return err ;
}
static int Wake ( std : : atomic < int32_t > * v , int32_t count ) {
int err = syscall ( SYS_futex , reinterpret_cast < int32_t * > ( v ) ,
FUTEX_WAKE | FUTEX_PRIVATE_FLAG , count ) ;
if ( ABSL_PREDICT_FALSE ( err < 0 ) ) {
err = - errno ;
}
return err ;
}
} ;
void Waiter : : Init ( ) {
futex_ . store ( 0 , std : : memory_order_relaxed ) ;
@ -91,7 +129,7 @@ bool Waiter::Wait(KernelTimeout t) {
// Loop until we can atomically decrement futex from a positive
// value, waiting on a futex while we believe it is zero.
while ( true ) {
int x = futex_ . load ( std : : memory_order_relaxed ) ;
int32_t x = futex_ . load ( std : : memory_order_relaxed ) ;
if ( x ! = 0 ) {
if ( ! futex_ . compare_exchange_weak ( x , x - 1 ,
std : : memory_order_acquire ,
@ -101,30 +139,14 @@ bool Waiter::Wait(KernelTimeout t) {
return true ; // Consumed a wakeup, we are done.
}
int err = 0 ;
if ( t . has_timeout ( ) ) {
// https://locklessinc.com/articles/futex_cheat_sheet/
// Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
struct timespec abs_timeout = t . MakeAbsTimespec ( ) ;
// Atomically check that the futex value is still 0, and if it
// is, sleep until abs_timeout or until woken by FUTEX_WAKE.
err = syscall (
SYS_futex , reinterpret_cast < int * > ( & futex_ ) ,
FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME , 0 ,
& abs_timeout , nullptr , FUTEX_BITSET_MATCH_ANY ) ;
} else {
// Atomically check that the futex value is still 0, and if it
// is, sleep until woken by FUTEX_WAKE.
err = syscall ( SYS_futex , reinterpret_cast < int * > ( & futex_ ) ,
FUTEX_WAIT | FUTEX_PRIVATE_FLAG , 0 , nullptr ) ;
}
const int err = Futex : : WaitUntil ( & futex_ , 0 , t ) ;
if ( err ! = 0 ) {
if ( errno = = EINTR | | errno = = EWOULDBLOCK ) {
if ( err = = - EINTR | | err = = - EWOULDBLOCK ) {
// Do nothing, the loop will retry.
} else if ( errno = = ETIMEDOUT ) {
return false ; // Timeout.
} else if ( err = = - ETIMEDOUT ) {
return false ;
} else {
ABSL_RAW_LOG ( FATAL , " Futex operation failed with errn o %d \n " , errno ) ;
ABSL_RAW_LOG ( FATAL , " Futex operation failed with error %d \n " , err ) ;
}
}
@ -141,10 +163,9 @@ void Waiter::Post() {
void Waiter : : Poke ( ) {
// Wake one thread waiting on the futex.
int err = syscall ( SYS_futex , reinterpret_cast < int * > ( & futex_ ) ,
FUTEX_WAKE | FUTEX_PRIVATE_FLAG , 1 ) ;
if ( err < 0 ) {
ABSL_RAW_LOG ( FATAL , " FUTEX_WAKE failed with errno %d \n " , errno ) ;
const int err = Futex : : Wake ( & futex_ , 1 ) ;
if ( ABSL_PREDICT_FALSE ( err < 0 ) ) {
ABSL_RAW_LOG ( FATAL , " Futex operation failed with error %d \n " , err ) ;
}
}