神刀安全网

Conditional variable with futex

Condition variable with futex

Ten years ago (okay, actually nine years and a few months at the time of writing), the only holdout among major general-purpose operating systems gained support for condition variables with the release of Windows Vista. Five years later, the ISO C and ISO C++ standards also defined condition variables, making them the de jure standard low-level primitive for thread synchronization, alongside mutual exclusion locks ( mutex ).

Sometimes though, you need to reimplement them.

Condition variable

The basic functionality of condition variables is well understood and supported. There are three operations:

  1. Atomically unlock a mutex and go to sleep (with an optional time-out). Relock the mutex when woken up.
  2. Wake one sleeping thread up.
  3. Wake all sleeping threads up.

Differences

However the devil is in the details. There are a number of subtle differences and features in the different implementations of condition variables, notably (non-exhaustive list):

  • Possibility of initialization failure, abort, or warranty of success.
  • Support for static initialization.
  • Destruction being necessary to avoid leaking resources, provided as a no-op or sanity check or not provided at all.
  • Absolute time-out, relative time-out or both.
  • If absolute time-out are supported, which reference clock(s) are supported.
  • If applicable, which reference clock(s) is used for statically initialized condition variables.
  • Preservation of thread priorities.
  • Support for inter-process "shared" condition variables.
  • Support for (deferred) thread cancellation, or equivalent.

POSIX threads

POSIX threads allow for initialization failure (and in principles, even run-time usage failure), and requires explicit destruction of dynamically allocated condition variables. This results in needlessly intricate error handling code. More often than not, the error paths are poorly tested or completely untested and riddled with bugs.

POSIX also has the disadvantage, for historical/compatibility reasons, of using the real-time clock ( CLOCK_REALTIME ) by default. In most use cases involving time-outs, this is undesirable. The real-time clock is subject to warping, when it is adjusted manually or with NTP, invalidating timestamp values in running threads. The more useful monotonic clock ( CLOCK_MONOTONIC ) is available as a non-default choice, Yet it cannot be used for statically allocated condition variable.

Windows

On Windows, initialization cannot fail and destruction is implicit. This simplifies the code by not introducing error cases. On the downside, there is support for cancellation, which does not exist on Windows (and condition variable waiting is not alertable). Also, in the odd cases where they are actually necessary, wall clock time-outs are not supported.

ISO C11

I have heard the argument that C11 support for threads and synchronization primitives was so bad as to be useless. That might be (just slightly) excessive. For one thing, the additions of a concurrent memory model (even if it is allegedly very imperfect), and of atomic operations are very welcome in my opinion. And in all due fairness, the standard authors presumably intended to design the standard according to the lowest common denominator of all existing implementations.

That being said, the C11 condition variables are indeed very limited. Static initialization is impossible, which is peremptory is many scenarii. As far as I can tell though, the worst default is probably the use of absolute time-outs using the TIME_UTC wall clock. That makes all time-out subject to the time warping problem, and prevents writing correct code in most cases involving time-outs.

So what is this about?

This is about how to (re)implement condition variables with a lower-level primitive. That primitive happens to exist on the two most common general purpose operating system kernels (I believe):

  • Linux (including Android, GNU/Linux and other variants), and
  • Microsoft Windows.

I did that because I was discontent with the existing implementation on certain operating systems (strong hint which ones right above), and also because I was curious.

Futex

In general, thread synchronization primitives requires userspace programs to invoke system calls (or perform a context switch if threads are implemented in userspace). The system call is fundamentally unavoidable for putting a thread to sleep while waiting for another thread, or to wake up a another thread out of sleep. But for better performances, atomic operations are used on the fast path to avoid system calls.

For instance, acquiring a uncontended lock does not require a system call, at least no intrinsically. Releasing a lock that no other threads are waiting for, likewise.

The futex is the solution to this problem that was introduced in Linux, in 2003 to support for POSIX threads natively and efficiently (older solutions to that problem exists, e.g. on BeOS). Futex originally means fast userspace mutex , but the mechanism is used for all those synchronization primitives that involve waiting and waking, including condition variables. That is to say all primitives other than atomic operations, and those based on atomic operations only, such as spin locks.

The concept is now also implemented on Windows nearly identically (more on that below) though it is not named in the documentation.

Using futeces

A futex is essentially an address to an atomic integer. The address is used as the identifier for a queue of waiting threads. The value of the integer at that address is used both for to implement the fast path with atomic operations (if applicable), and to cope with corner case race conditions in case of contention.

Historically, manipulationg a futex involved architecture-specific assembler code for atomic operation . Nowadays, standard C/C++ 11 atomic operations can be used , greatly simplifying the job.

Futex operations

glibc does not provide a direct or shallow wrapper for futex . Instead it exposes the functionality via the POSIX threads and POSIX semaphores functions.

futex is actually a single system call with multiple operations. That may seem odd, even confusing if not outright ugly. However, that is common practice for a unique system call: the ioctl system call has far more operations than futex . As another example, programmers will not usually notice because glibc hides it, but all socket-related functions are implemented by the single socketcall system call.

I will not enumerate all the possible operations and flags here, only the two most important and basic operations and one flag. For details and for the exact system call prototype, the curious can check the futex(2) manual page .

FUTEX_WAIT

#include <stdatomic.h> #include <time.h> #include <linux/futex.h>  int futex_wait(atomic_int *addr, int val, const struct timespec *to) {         return sys_futex(addr, FUTEX_WAIT_PRIVATE, val, to, NULL, 0); }

This operation puts the calling thread to sleep, if and only if the value at address addr is equal to val . If to is not NULL , it contains a relative time-out value after which the thread wakes up.

Note that we use FUTEX_WAIT_PRIVATE , which is a short-hand for FUTEX_WAIT|FUTEX_PRIVATE_FLAG . The FUTEX_PRIVATE_FLAG flag indicates that addr is only visible to the calling process, so the kernel does not need to lock and check its virtual memory management state to find other threads using the same address in other processes.

FUTEX_WAKE

int futex_wake(atomic_int *addr, int nr) {         return sys_futex(addr, FUTEX_WAKE_PRIVATE, nr, NULL, NULL, 0); }

This operation wakes up to nr threads waiting on the futex located at address addr up. Typically, nr is either:

  • 1 to wake just one thread up, or
  • INT_MAX to wake all threads up.
int futex_signal(atomic_int *addr) {         return (futex_wake(addr, 1) >= 0) ? 0 : -1; }  int futex_broadcast(atomic_int *addr) {         return (futex_wake(addr, INT_MAX) >= 0) ? 0 : -1; }

As previously, FUTEX_WAKE_PRIVATE is a shorthand for FUTEX_WAKE|FUTEX_PRIVATE_FLAG .

Implementing a condition variable with a futex

Lets assume we already have a mutex implementation. How do we implement a condition variabe with a futex?

Simple but very wrong

The most obvious solution would be something as follows:

typedef struct cnd {     atomic_int value; } cnd_t;  /* Our static initializer */ #define CND_INITIALIZER_NP { ATOMIC_VAR_INIT(0) }  int cnd_init(cnd_t *cv) {     atomic_init(&cv->value, 0);     return thrd_success; }  void cnd_destroy(cnd_t *cv) {     (void) cv; }  int cnd_wait(cnd_t *cv, mtx_t *mtx) {     mtx_unlock(mtx),     futex_wait(&cv->value, 0, NULL);     mtx_lock(mtx);     return thrd_success; }  /* cnd_timedwait() omitted for simplicity */  int cnd_signal(cnd_t *cv) {     futex_signal(&cv->value);     return thrd_success; }  int cnd_broadcast(cnd_t *cv) {     futex_broadcast(&cv->value);     return thrd_success; }

Did you spot the outrageous bug there?

There is a very good reason why cnd_wait() requires a pointer to a mutex : to avoid lost signals. In this overly simplistic implementation, if cnd_signal() calls futex_wake() before the waiting thread actually starts sleeping in kernel, the call will be a no-op. Then when the sleeping thread actually gets to sleep in futex_wait() it gets stuck since it missed the wake-up.

That is why futex_wait() has a second parameter: to deal with contention in corner cases. The expected futex value in race-free cases must be determined by cnd_wait() before it unlocks the mutex. If there is a race with cnd_signal() , the futex value should have changed such that the kernel does not put the thread to sleep in futex_wait() , and returns immediately.

Counting waiters: too good to be true

So lets try to use the futex as a counter. If it worked, it would have two benefits:

  • the futex_wake() system call could be optimized away when there are zero waiters, and
  • cnd_destroy() could detect invalid attempts to destroy a condition variable still in use (non-zero waiters).
int cnd_wait(cnd_t *cv, mtx_t *mtx) {     unsigned refs = 1u + atomic_fetch_add(&cv->value, 1);     /* Add 1u for add-and-fetch semantics instead of fetch-and-add */      mtx_unlock(mtx),     futex_wait(&cv->value, refs, NULL);      atomic_fetch_sub(&cv->value, 1);     mtx_lock(mtx);     return thrd_success; }  int cnd_signal(cnd_t *cv) {     if (atomic_load(&cv->value) != 0)         futex_signal(&cv->value);     return thrd_success; }

The bugs are bleedingly obvious there. The same problem as in the first shot still remains. And there are other problems too. If you did not already spot them, here is a hint: wait if there are more than one concurrent waiting thread?

Sequence counter, close but no cigar

Given the semantics of futex_wait() , one essential requirement to avoid losing signals is for cnd_signal() to modify the value of the futex before waking any thread, so that it is different from the value before the waiting thread unlocked the mutex.

That way either of the following happens:

  • The waiting thread computes the futex value then, unlocks the mutex. It then goes to sleep before the futex value changes. The waking thread changes the value (with no effects), then wakes the waiting thread with a system call.
  • The waiting thread computes the futex value then unlocks the mutex. Then a race happens with the waking thread. The waking thread changes the futex value. The waiting thread call to futex_wait() is a no-op because the futex value does not match.

Either way, the waiting thread is not staying asleep.

The simplest way to implement this is a sequence counter. The Bionic C library used by Android works that way (simplified for clarity):

int cnd_wait(cnd_t *cv, mtx_t *mtx) {     int val = atomic_load(&cv->value);      mtx_unlock(mtx),     futex_wait(&cv->value, val, NULL);     mtx_lock(mtx);     return thrd_success; }  int cnd_signal(cnd_t *cv) {     atomic_fetch_add(&cv->value, 1);     futex_wake(&cv->value);     return thrd_success;  }

The bug here is almost as well documented as the implementation. To be honest, it is extremely unlikely to occur in real use.

If cnd_signal() is called exactly 4294967296 times in a row before the waiting thread makes progress, the sequence number wraps around, gets its original value, and the signal is lost.

To be precise, that would occur for exactly a non-zero multiple of two to the power CHAR_BIT * sizeof (int) (which equals 32 on all Linux systems, to my knowledge). To avoid that issue or any variant thereof, we need to ensure that the futex value set by cnd_signal() is never equal to a value computed by cnd_wait() .

Toggle

In the race-free scenario, the futex value must be unchanged so the thread goes to sleep. In the race scenario, as we just saw, the futex value must be changed to a value never used for sleeping.

Those two requirements can only be accommodated by making both cnd_waitl() and cnd_signal() modify the futex.

There is another requirement in case more than one thread is sleeping on the same condition variable. That takes us back to the other problem with the waiters counting approach.

If more than one thread goes to sleep in a row, the second one must not change the futex value. Otherwise, the first thread would potentially fail to go to sleep due to the changed futex value. The more threads wait on the same condition variable, the more likely the problem. With enough threads, it could degrade into a live loop .

With that in mind, I think that the simplest solution is as follows:

int cnd_wait(cnd_t *cv, mtx_t *mtx) {     atomic_store(&cv->value, 1);      mtx_unlock(mtx),     futex_wait(&cv->value, 1, NULL);     mtx_lock(mtx);     return thrd_success; }  int cnd_signal(cnd_t *cv) {     atomic_store(&cv->value, 0);     futex_wake(&cv->value);     return thrd_success; }

And, correct me if I am wrong, but I believe it works. We however use only 1 bit and waste the other futex 31 bits. A slightly better version that preserves those bits so they could be used for flags (they must be constant):

int cnd_wait(cnd_t *cv, mtx_t *mtx) {     /* Set lowest order bit */     int value = atomic_fetch_and(&cv->value, 1) | 1;      mtx_unlock(mtx),     futex_wait(&cv->value, value, NULL);     mtx_lock(mtx);     return thrd_success; }  int cnd_signal(cnd_t *cv) {     /* Clear lowest order bit */     atomic_fetch_and(&cv->value, -2);     futex_wake(&cv->value);     return thrd_success; }

Relaxed memory barriers

For simplicity, implicit barriers were used so far. In practice, atomic operations can be explicitly relaxed ( memory_order_relaxed ). The necessary memory barriers are provided by the mutex, not the condition variable , so do not need to add any, e.g.:

int cnd_wait(cnd_t *cv, mtx_t *mtx) {         int value = atomic_fetch_and_explicit(&cv->value, 1,                                               memory_order_relaxed) | 1;          mtx_unlock(mtx),         futex_wait(&cv->value, value, NULL);         mtx_lock(mtx);         return thrd_success; }  int cnd_signal(cnd_t *cv) {         atomic_fetch_and_explicit(&cv->value, -2, memory_order_relaxed);         futex_wake(&cv->value);         return thrd_success; }

Outstanding issues

Timed wait

We did not provide an implementation for cnd_timedwait() . This can trivially be added. The third parameter to futex_wait() should be a non- NULL pointer to a struct timespec providing the time-out duration.

Also the return value must be checked to distinguish time-outs from wake-ups.

Note that the time-out would follow the monotonic clock. If you want true UTC clock semantics (as in ISO C11), you need to use FUTEX_CLOCK_REALTIME flag to the futex system call.

System call optimization

We did not optimize the waking system call away when there no waiters. A new member would have to be added struct cnd , e.g. an atomic waiter count variable. Be careful with ordering of atomic operations though.

Mutex contention

We did not spend much time looking at cnd_broadcast() . We assumed that we can substitute futex_signal() with futex_broadcast() in cnd_signal() .

All waiting threads will wake up at the same time, and compete for the mutex in mtx_lock() . Only one thread gets it, all other go immediately back to sleep.

Linux provides the futex requeue operation to deal with that problem. due to contention on the mutex.

Priorities and fairness

We did not look at waiter fairness at all here. This is not usually a problem, but you should keep that in mind if you plan to make your own futex-based operations.

We also did not consider thread priorities. Again, this is not a problem in typical applications, but it is obviously an issue in real-time use cases. For those, Linux has priority-inheritance (PI) futex operations.

Windows support

With Windows 8, futex-like operations are possible. In general, the Windows support is a strict subset of Linux support: it only provides wait, timed wait, wake one and wake all operations. There are no flags to distinguish private and shared addresses, and to support the UTC clock. There are also none of the advanced operations that we did not go through.

In all due fairness, Windows has one feature that Linux does not have. The futex do not have to be int . Any data type of 1, 2, 4 or 8 bytes is supported.

#include <windows.h>  int futex_wait(atomic_int *addr, int val, const struct timespec *to) {         if (to == NULL)         {                 WaitOnAddress((volatile void *)addr, &val, sizeof (val),                               -1);                 return 0;         }          if (to->tv_nsec >= 1000000000)         {                 errno = EINVAL;                 return -1;         }          if (to->tv_sec >= 2147)         {                 WaitOnAddress((volatile void *)addr, &val, sizeof (val),                               2147000000);                 return 0; /* time-out out of range, claim spurious wake-up */         }          DWORD ms = (to->tv_sec * 1000000)                  + ((to>tv_nsec + 999) / 1000);          if (!WaitOnAddress((volatile void *)addr, &val,                            sizeof (val), ms))         {             errno = ETIMEDOUT;             return -1;         }         return 0; }  int futex_signal(atomic_int *addr) {         WakeByAddressSingle(addr);         return 0; }  int futex_broadcast(atomic_int *addr) {         WakeByAddressAll(addr);         return 0; }

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » Conditional variable with futex

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址