Having watched Herb Sutter’s C++ Concurrency video, I wanted to try out a few of the techniques for myself. The first step was to write a simple synchronised queue, which he left as an exercise for the reader. The key feature is that pop() blocks until an element is pushed into the queue – then it returns the element. This turns out to be pretty succinct using C++11 features like std::mutex and std::condition_variable:
namespace musingstudio
{
template<typename T>
class SynchronizedQueue
{
std::deque<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_wait_for_non_empty;
public:
// When an element of T is pushed onto the queue,
// one caller waiting in a pop() call will be notified
void push( const T& t )
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push_back(t);
m_wait_for_non_empty.notify_one();
}
// Calls to pop() will block until an element of T is
// pushed onto the queue
T pop()
{
std::unique_lock<std::mutex> lock(m_mutex);
while(m_queue.empty())
{
m_wait_for_non_empty.wait(lock);
}
T tmp(m_queue.front());
m_queue.pop_front();
return tmp;
}
};
}
and here’s some code to exercise it:
void testConcurrentQueue()
{
musingstudio::SynchronizedQueue<int> elements;
std::thread pusher([&]()
{
for (int i = 0; i < 5; ++i)
{
wait();
std::cout << "Pushing " << i << '\n';
elements.push(i);
}
});
std::thread popper([&]()
{
for (int j = 0; j < 5; ++j )
{
int popped = elements.pop();
std::cout << "Popped " << popped << "\n";
}
});
pusher.join();
popper.join();
}
Output:

The next item that caught my eye was a template class that wraps an instance of T so that access to it becomes transactional across threads. No need to explicitly take a lock for each group of calls to the object – instead, you express each transaction on on the instance of T as a lambda. A mutex blocks and the command (expressed as a lambda) is executed in the calling thread. Herb called his example Monitor<T>, but I preferred Sequential<T> as a partner to Concurrent<T> (see below). Also, I replaced operator() with excute() (in my opinion it’s easier to read). Typical use looks like this:
Sequential<T> t( ... );
t.execute([&](T& u)
{
/* perform multiple operations in this lambda as one synchronised transaction*/
});
So much for the context – here’s the implementation:
namespace musingstudio
{
template<typename T>
class Sequential
{
mutable T m_t;
mutable std::mutex m_mutex;
public:
Sequential( T t ) : m_t( t )
{
}
template<typename F>
auto execute( F f ) const -> decltype(f(m_t))
{
std::unique_lock<std::mutex> lock(m_mutex);
return f(m_t);
}
};
}
And here’s the code in action, using Sequential<ostream&> to synchronise calls to std::cout:
void testSequential()
{
musingstudio::Sequential<std::ostream&> sync_cout( std::cout );
auto doPush = [&]()
{
for ( int i = 0; i < 10; ++i )
{
sync_cout.execute([&](std::ostream& os)
{
os << i << i << i << i << i << "\n";
});
}
};
std::thread thread1(doPush);
std::thread thread2(doPush);
thread1.join();
thread2.join();
}
Output:

Now that we’ve got SynchronizedQueue<T> and Sequential<T>, here’s Concurrent<T> which provides a way to perform a series of synchronised operations on some object in parallel with the activity on the main thread. For example, if you need to keep a GUI thread responsive. I love the idea of pushing a “Done” event onto the message queue in the destructor so that queued work is concluded and then Concurrent<T> returns. This is also a very nice use for std::future and std::promise – allow the caller to keep the return value as a future, but don’t block until it’s needed.
template<typename T>
class Concurrent
{
mutable T m_t;
mutable SynchronizedQueue<std::function<void()>> m_queue;
bool m_done;
std::thread m_worker;
// Assign value to the promise where there's a
// non-trivial return type
template<typename Ret, typename Ftn, typename T>
void setValue( std::promise<Ret>& promise, Ftn& f, T& t ) const
{
promise.set_value( f(t) );
}
// Assign void to the promise - trivial void return type
template<typename Ftn, typename T>
void setValue( std::promise<void>& promise, Ftn& f, T& t ) const
{
f(t);
promise.set_value();
}
public:
Concurrent( T t ) : m_t(t), m_done(false),
m_worker( [=](){ while(!this->m_done){ m_queue.pop()(); }} )
{}
~Concurrent()
{
m_queue.push( [=]{ this->m_done = true; } );
m_worker.join();
}
// In order to return a value from the operation that we
// process on another thread, use async, promises and futures
// - we can't just return the calculated value,
// because then the caller would have to block.
template<typename F>
auto execute( F f ) const -> std::future<decltype(f(m_t))>
{
auto promise =
std::make_shared<std::promise<decltype(f(m_t))>>();
auto return_value = promise->get_future();
m_queue.push( [=]()
{
try
{
setValue( *promise, f, m_t );
}
catch(...)
{ promise->set_exception( std::current_exception() ); }
});
return return_value;
}
};
Here’s some code to exercise Concurrent<T>:
void testConcurrentReturningFunction()
{
musingstudio::Concurrent<std::string> words("Concurrent<T> - ");
std::vector<std::future<std::string>> values;
// Set off the calculations in a worker thread, storing future return values
for ( size_t i = 0; i < 10; ++i )
{
values.push_back(
words.execute( [=]( std::string& in )
{
in += std::to_string(i);
return in;
}) );
}
// Now collection the return values and display them
std::for_each( values.begin(), values.end(), [](std::future<std::string>& f)
{
std::cout << f.get() << "\n";
});
}
Output:
