Condition Variables¶
Condition Variables¶
Condition Variable
The most basic communication device
Everything else can be built around it (and a mutex)
Semaphores
Events
Message queues
Promises and futures (⟶ later)
Condition Variables: By Example¶
#include <mutex>
#include <condition_variable>
#include <deque>
#include <thread>
#include <chrono>
#include <iostream>
// message queue which can hold up to maxelem elements of type
// T. associated with the queue are two wait conditions:
// * a consumer has to wait if the queue is empty. he then waits for
// the condition "not empty" which is made true by a producer.
// * a producer has to wait if the queue is full. he then waits for
// the condition "not full" which is made true by a consumer.
// note that std::condition_variable must be used with
// std::unique_lock<std::mutex>, which is a moveable version of a lock
// guard. this is to guarantee that the mutex remains locked when a
// thread enters the wait and an exception is thrown inside the
// std::condition_variable
// implementation. std::condition_variable::wait(guard) moves the
// guard into the condition variable before suspending the thread, and
// moves it back out to the caller again after wakeup.
template <typename T> class MessageQueue
{
public:
MessageQueue(size_t maxelem) : maxelem_(maxelem) {}
void push(T elem)
{
{
std::unique_lock<std::mutex> guard(lock_);
// wait if the queue is full. (use while instead of if to
// guard against spurious wakeups.)
while (q_.size() == maxelem_)
// atomically release the mutex and put the calling
// thread to sleep until the condition becomes true
// (not_full_ is notified).
not_full_.wait(guard);
q_.push_back(elem);
}
// notify a waiting consumer that there might is an item
// available.
not_empty_.notify_one();
}
T pop()
{
T rv;
{
std::unique_lock<std::mutex> guard(lock_);
// wait if the queue is empty. (use while instead of if to
// guard against spurious wakeups.)
while (q_.size() == 0)
// atomically release the mutex and put the calling
// thread to sleep until the condition becomes true
// (not_empty_ is notified).
not_empty_.wait(guard);
rv = q_.front();
q_.pop_front();
}
// notify a waiting producer that there is room.
not_full_.notify_one();
return rv;
}
private:
std::mutex lock_;
std::condition_variable not_empty_;
std::condition_variable not_full_;
size_t maxelem_;
std::deque<T> q_;
};
void producer_func(MessageQueue<int>& mq, int nelem)
{
for (int i=0; i<nelem; i++) {
mq.push(i);
std::cout << "produced " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
}
void consumer_func(MessageQueue<int>& mq, int threadno, int nelem)
{
for (int i=0; i<nelem; i++) {
int elem = mq.pop();
std::cout << "consumer #" << threadno << " consumed " << elem << std::endl;
}
}
int main()
{
MessageQueue<int> mq(2);
std::thread producer(producer_func, std::ref(mq), 10);
std::thread consumers[5]{
std::thread(consumer_func, std::ref(mq), 0, 2),
std::thread(consumer_func, std::ref(mq), 1, 2),
std::thread(consumer_func, std::ref(mq), 2, 2),
std::thread(consumer_func, std::ref(mq), 3, 2),
std::thread(consumer_func, std::ref(mq), 4, 2),
};
for (auto& thread: consumers)
thread.join();
producer.join();
}
Danger
while
instead of if
⟶ Spurious Wakeups!
More Communication: Future¶
Problem:
Worker thread calculates something in the background
Somebody waits (synchronizes) for that something to become ready
That something will become ready in the future
Solution:
Condition variable and mutex
encapsulated in a “future” device.
Manually coded here for fun …
#include <mutex>
#include <condition_variable>
#include <thread>
#include <iostream>
// a Future can be thought of as a handle to something (of type T)
// that is not yet available (will be produced by a different thread).
// any thread that retrieves the element (using get()) will be
// suspended until the element is available.
// a thread that makes the element available wakes a possible waiter.
template <typename T> class Future
{
public:
Future() : is_ready_(false) {}
// if the element is available, it is returned
// immediately. otherwise, the caller is suspended until the
// element is available.
T get()
{
std::unique_lock<std::mutex> guard(lock_);
// wait(..., pred): does the bloody spurious wakeup loop
// inside
cond_ready_.wait(guard, [this]() { return this->is_ready_; });
return value_;
}
// make the element available.
void set(T value)
{
{
std::unique_lock<std::mutex> guard(lock_);
value_ = value;
is_ready_ = true;
}
cond_ready_.notify_one();
}
private:
std::mutex lock_;
bool is_ready_;
std::condition_variable cond_ready_;
T value_;
};
int main()
{
Future<int> future;
// this thread waits a couple of seconds and then produces the
// element
std::thread([&future]() {
// countdown
int i=5;
while (i) {
std::cout << i-- << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
// fire
std::cout << "fire!" << std::endl;
future.set(666);
}).detach();
// the main thread immediately get()s the element - which is not
// likely to be available already.
std::cout << "waiting for future" << std::endl;
std::cout << "result: " << future.get() << std::endl;
}
std::promise
and std::future
¶
Same scenario, but different responsibilities
Somebody promises to have textit{something} ready in the future
Two objects …
std::promise
is used by producer (the one who promises)std::future
is used by consumer (who relies on the promise that has been made)
Best done by example
#include <future>
#include <thread>
#include <iostream>
int main()
{
// as promised *by worker*: will be ready in the future
std::promise<int> promise;
// a handle to the bright future; somebody can wait if he wants
std::future<int> future = promise.get_future();
std::thread([&promise]() {
// countdown
int i=5;
while (i) {
std::cout << i-- << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
// fire
std::cout << "fire!" << std::endl;
promise.set_value(666);
}).detach();
std::cout << "waiting for future" << std::endl;
std::cout << "result: " << future.get() << std::endl;
}