By Anthony Williams

This article explores how to synchronize concurrent operations in the C++ programming language and how this relates to threads and their respective function.

 

Save 37% off C++ Concurrency in Action, Second Edition with code fccwilliams at manning.com.

 

Waiting for an event or other condition

Suppose you’re traveling on an overnight train. One way to ensure you get off at the right station would be to stay awake all night and pay attention to where the train stops. You wouldn’t miss your station, but you’d be tired when you got there. Alternatively, you could look at the timetable to see when the train is supposed to arrive, set your alarm a bit before, and go to sleep. That’d be OK; you wouldn’t miss your stop, but if the train got delayed, you’d wake up too early. There is also a possibility that your alarm clock’s batteries might die, and you oversleep and miss your station. What would be ideal is if you could go to sleep and have somebody or something wake you up when the train gets to your station, whenever that happens.

How does that relate to threads? Well, if one thread is waiting for a second thread to complete a task, it has several options. First, it could keep checking a flag in shared data (protected by a mutex) and have the second thread set the flag when it completes the task. This is wasteful on two counts: the thread consumes valuable processing time by repeatedly checking the flag, and when the mutex is locked by the waiting thread, it can’t be locked by any other thread. Both work against the thread doing the waiting, because they limit the resources available to the thread being waited for, and even prevent it from setting the flag when it’s done. This is akin to staying awake all night talking to the train driver: he has to drive the train more slowly because you keep distracting him, and it takes longer to get there. Similarly, the waiting thread is consuming resources that could be used by other threads in the system and may end up waiting longer than necessary.

A second option is to have the waiting thread sleep for small periods between the checks using the std::this_thread::sleep_for() function:

 
bool flag;
std::mutex m;
void wait_for_flag()
{
    std::unique_lock<std::mutex> lk(m);
    while(!flag)                                  
    {
        lk.unlock();                                                  ❶ 
        std::this_thread::sleep_for(std::chrono::milliseconds(100));  ❷ 
        lk.lock();                                                    ❸ 
    }
}
 

❶   Unlock the mutex

❷   Sleep for 100 ms

❸   Relock the mutex

In the loop, the function unlocks the mutex ❶   before the sleep ❷   and locks it again afterward ❸  , and another thread gets a chance to acquire it and set the flag.

This is an improvement, because the thread doesn’t waste processing time while it’s sleeping, but it’s hard to get the sleep period right. Too short a sleep between checks causes the thread to waste processing time checking; too long a sleep and the thread keeps on sleeping even when the task it’s waiting for is complete, introducing a delay. It’s rare that this oversleeping has a direct impact on the operation of the program, but it could mean dropped frames in a fast-paced game or overrunning a time slice in a real-time application.

The third, and preferred, option is to use the facilities from the C++ Standard Library to wait for the event itself. The most basic mechanism for waiting for an event to be triggered by another thread (such as the presence of additional work in the pipeline mentioned previously) is the condition variable. Conceptually, a condition variable is associated with some event or other condition, and one or more threads can wait for that condition to be satisfied. When a thread determines that the condition is satisfied, it notifies one or more of the threads waiting on the condition variable to wake them up and allow them to continue processing.

Waiting for a condition with condition variables

The Standard C++ Library provides not one, but two implementations of a condition variable: std::condition_variable and std::condition_variable_any. Both are declared in the <condition_variable> library header. In both cases, they need to work with a mutex in order to provide appropriate synchronization; the former is limited to working with std::mutex, whereas the latter can work with anything that meets some minimal criteria for being mutex-like, hence the _any suffix. Because std::condition_variable_any is more general, there’s the potential for additional costs in terms of size, performance, or operating system resources, and std::condition_ variable should be preferred unless the additional flexibility is required.

How do you use a std::condition_variable to handle the example in the introduction—how do you let the thread waiting for work sleep until there’s data to process? The following listing shows one way you could do this with a condition variable.

Listing 1 Waiting for data to process with a std::condition_variable

 
std::mutex mut;
std::queue<data_chunk> data_queue;                 ❶ 
std::condition_variable data_cond;
void data_preparation_thread()
{
    while(more_data_to_prepare())
    {
        data_chunk const data=prepare_data();
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data);                     ❷ 
        data_cond.notify_one();                    ❸ 
    }
}
void data_processing_thread()
{
    while(true)
    {
        std::unique_lock<std::mutex> lk(mut);      ❹ 
        data_cond.wait(
            lk,[]{return !data_queue.empty();});   ❺ 
        data_chunk data=data_queue.front();
        data_queue.pop();
        lk.unlock();                               ❻ 
        process(data);
        if(is_last_chunk(data))
            break;
    }
}
 
 

First off, you have queue ❶  , which is used to pass the data between the two threads. When the data is ready, the thread preparing the data locks the mutex, protecting the queue using a std::lock_guard, and pushes the data onto queue ❷  . It then calls the notify_one() member function on the std::condition_variable instance to notify the waiting thread (if one exists) in ❸  .

On the other side of the fence, you have the processing thread. This thread first locks the mutex, but this time with a std::unique_lock rather than a std::lock_ guard ❹  —you’ll see why in a minute. The thread then calls wait() on the std:: condition_variable, passing in the lock object and a lambda function that expresses the condition being waited for in ❺  . Lambda functions are a new feature in C++11 that allow you to write an anonymous function as part of another expression, and they’re ideally suited for specifying predicates for standard library functions such as wait(). In this case, the simple lambda function []{return !data_queue.empty();} checks to see if the data_queue isn’t empty()—if there is some data in the queue ready for processing.

The implementation of wait()checks the condition (by calling the supplied lambda function) and returns if it’s satisfied (the lambda function returns true). If the condition isn’t satisfied (the lambda function returns false), wait() unlocks the mutex and puts the thread in a blocked or waiting state. When the condition variable is notified by a call to notify_one() from the data-preparation thread, the thread wakes from its slumber (unblocks it), reacquires the lock on the mutex, and checks the condition again, returning from wait() with the mutex still locked if the condition is satisfied. If the condition isn’t satisfied, the thread unlocks the mutex and resumes waiting. This is why you need the std::unique_lock rather than the std::lock_guard—the waiting thread must unlock the mutex while it’s waiting and lock it again afterward, and std::lock_guard doesn’t provide that flexibility. If the mutex remained locked while the thread was sleeping, the data-preparation thread wouldn’t be able to lock the mutex to add an item to the queue, and the waiting thread would never be able to see its condition satisfied.

Listing 1 uses a simple lambda function for the wait in ❺  , which checks to see if the queue isn’t empty, but any function or callable object could be passed. If you already have a function to check the condition (perhaps because it’s more complicated than a simple test like this), then this function can be passed directly; there’s no need to wrap it in a lambda. During a call to wait(), a condition variable may check the supplied condition any number of times; but it always does this with the mutex locked and returns immediately if (and only if) the function provided to test the condition returns true. When the waiting thread reacquires the mutex and checks the condition, if it isn’t in direct response to a notification from another thread, it’s called a spurious wake. Because the number and frequency of any such spurious wakes are indeterminate, it isn’t advisable to use a function with side effects for the condition check. If you do, you must be prepared for the side effects to occur multiple times.

The flexibility to unlock a std::unique_lock isn’t only used for the call to wait(); it’s also used once you have the data to process but before processing it ❻  . Processing data can potentially be a time-consuming operation, and it’s a bad idea to hold a lock on a mutex for longer than necessary.

Using a queue to transfer data between threads, as in listing 1, is a common scenario. Done well, the synchronization can be limited to the queue itself, which greatly reduces the possible number of synchronization problems and race conditions. In view of this, let’s now work on extracting a generic thread-safe queue from listing 1.

Building a thread-safe queue with condition variables

If you’re going to be designing a generic queue, it’s worth spending a few minutes thinking about the operations which are likely to be required. Let’s look at the C++ Standard Library for inspiration, in the form of the std::queue<> container adaptor shown in the following listing.

Listing 2 std::queue interface

 
template <class T, class Container = std::deque<T> >
class queue {
public:
    explicit queue(const Container&);
    explicit queue(Container&& = Container());
    template <class Alloc> explicit queue(const Alloc&);
    template <class Alloc> queue(const Container&, const Alloc&);
    template <class Alloc> queue(Container&&, const Alloc&);
    template <class Alloc> queue(queue&&, const Alloc&);
    void swap(queue& q);
    bool empty() const;
    size_type size() const;
    T& front();
    const T& front() const;
    T& back();
    const T& back() const;
    void push(const T& x);
    void push(T&& x);
    void pop();
    template <class... Args> void emplace(Args&&... args);
};
 
 

If you ignore the construction, assignment and swap operations, you’re left with three groups of operations: those that query the state of the whole queue (empty() and size()), those that query the elements of the queue (front() and back()), and those that modify the queue (push(), pop() and emplace()). This is the same issue regarding race conditions inherent in the interface. Consequently, you need to combine front() and pop() into a single function call, much as you combined top() and pop() for the stack. The code from listing 1 adds a new nuance, though: when using a queue to pass data between threads, the receiving thread often needs to wait for the data. Let’s provide two variants on pop():try_pop(), which tries to pop the value from the queue but always returns immediately (with an indication of failure) even if there wasn’t a value to retrieve, and wait_and_pop(), which waits until there’s a value to retrieve. If you take your lead for the signatures from the stack example, your interface looks like the following.

Listing 3 The interface of your threadsafe_queue

 
#include <memory>                               ①
template<typename T>
class threadsafe_queue
{
public:
    threadsafe_queue();
    threadsafe_queue(const threadsafe_queue&);
    threadsafe_queue& operator=(
        const threadsafe_queue&) = delete;      ②
    void push(T new_value);
    bool try_pop(T& value);                     ❶ 
    std::shared_ptr<T> try_pop();               ❷ 
    void wait_and_pop(T& value);
    std::shared_ptr<T> wait_and_pop();
    bool empty() const;
};
 

① For std::shared_ptr

② Disallow assignment for simplicity

As you did for the stack, you’ve cut down on the constructors and eliminated assignment to simplify the code. You’ve also provided two versions of both try_pop() and wait_for_pop(), as before. The first overload of try_pop() ❶   stores the retrieved value in the referenced variable, and it can use the return value for status; it returns true if it retrieved a value and false otherwise. The second overload ❷   can’t do this, because it returns the retrieved value directly. But the returned pointer can be set to NULL if there’s no value to retrieve.

How does all this relate to listing 1? You can extract the code for push() and wait_and_pop() from there, as shown in the next listing.

Listing 4 Extracting push() and wait_and_pop() from listing 1

 
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
    std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }
    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        value=data_queue.front();
        data_queue.pop();
    }
};
threadsafe_queue<data_chunk> data_queue;     ❶ 
void data_preparation_thread()
{
    while(more_data_to_prepare())
    {
        data_chunk const data=prepare_data();
        data_queue.push(data);               ❷ 
    }
}
void data_processing_thread()
{
    while(true)
    {
        data_chunk data;
        data_queue.wait_and_pop(data);       ❸ 
        process(data);
        if(is_last_chunk(data))
            break;
    }
}
 
 

The mutex and condition variable are now contained within the threadsafe_queue instance, and separate variables are no longer required ❶  , and no external synchronization is required for the call to push()in ❷  . Also, wait_and_pop() takes care of the condition variable wait in ❸  .

The other overload of wait_and_pop() is now trivial to write. The final queue implementation is shown here.

Listing 5 Full class definition for a thread-safe queue using condition variables

 
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut;      ❶ 
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue()
    {}
    threadsafe_queue(threadsafe_queue const& other)
    {
        std::lock_guard<std::mutex> lk(other.mut);
        data_queue=other.data_queue;
    }
    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }
    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        value=data_queue.front();
        data_queue.pop();
    }
    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }
    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return false;
        value=data_queue.front();
        data_queue.pop();
        return true;
    }
    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};
 

❶   The mutex must be mutable

Even though empty() is a const member function, and the other parameter to the copy constructor is a const reference, other threads may have non-const references to the object, and call mutating member functions, which means we still need to lock the mutex. Because locking a mutex is a mutating operation, the mutex object must be marked mutable ❶   to lock in empty() and in the copy constructor.

Condition variables are also useful when there’s more than one thread waiting for the same event. If the threads are used to divide the workload, only one thread should respond to a notification (the same shown in listing 1 can be used) to run multiple instances of the data-processing thread. When new data is ready, the call to notify_one()triggers one of the threads currently executing wait() to check its condition to return from wait() (because you’ve added an item to the data_queue). There are no guarantees on which thread is notified or even if there’s a thread waiting to be notified; all the processing threads might be still processing data.

Another possibility is that several threads are waiting for the same event, and all of them need to respond. This can happen where shared data is being initialized, and the processing threads can all use the same data, but need to wait for it to be initialized. This can also happen where the threads need to wait for an update to shared data, such as a periodic reinitialization. In these cases, the thread preparing the data can call the notify_all() member function on the condition variable rather than notify_one().As the name suggests, this causes all the threads currently executing wait() to check the condition they’re waiting for.

If the waiting thread is going to wait only once, once the condition is true it won’t wait on this condition variable again; a condition variable might not be the best choice of synchronization mechanisms. This is true if the condition being waited for is the availability of a specific data. In this scenario, a future might be more appropriate.

That’s all for this article. For more, download the free first chapter of C++ Concurrency in Action, Second Edition and see this Slideshare presentation.