Concurrency with C++11

Having watched Herb Sutter’s C++ Concurrency video, I wanted to try out a few of the techniques for myself. The first step was to write a simple synchronised queue, which he left as an exercise for the reader.  The key feature is that pop() blocks until an element is pushed into the queue – then it returns the element.  This turns out to be pretty succinct using C++11 features like std::mutex and std::condition_variable:

namespace musingstudio
{
  template<typename T>
  class SynchronizedQueue
  {
    std::deque<T> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_wait_for_non_empty;

  public:
    // When an element of T is pushed onto the queue,
    // one caller waiting in a pop() call will be notified
    void push( const T& t )
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      m_queue.push_back(t);
      m_wait_for_non_empty.notify_one();
    }

    // Calls to pop() will block until an element of T is 
    // pushed onto the queue
    T pop()
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      while(m_queue.empty())
      {
        m_wait_for_non_empty.wait(lock);
      }
      T tmp(m_queue.front());
      m_queue.pop_front();
      return tmp;
    }
  };
}

and here’s some code to exercise it:

void testConcurrentQueue()
{
  musingstudio::SynchronizedQueue<int> elements;

  std::thread pusher([&]()
  {
    for (int i = 0; i < 5; ++i)
    {
      wait();
      std::cout << "Pushing " << i << '\n';
      elements.push(i);
    }
  });

  std::thread popper([&]()
  {
    for (int j = 0; j < 5; ++j )
    {
      int popped = elements.pop();
      std::cout << "Popped " << popped << "\n";
    }
  });

  pusher.join();
  popper.join();
}

Output:

SynchronizedQueueOutput

The next item that caught my eye was a template class that wraps an instance of T so that access to it becomes transactional across threads. No need to explicitly take a lock for each group of calls to the object – instead, you express each transaction on on the instance of T as a lambda. A mutex blocks and the command (expressed as a lambda) is executed in the calling thread.  Herb called his example Monitor<T>, but I preferred Sequential<T> as a partner to Concurrent<T> (see below).  Also, I replaced operator() with excute() (in my opinion it’s easier to read).  Typical use looks like this:

Sequential<T> t( ... ); 
t.execute([&](T& u)
{  
  /* perform multiple operations in this lambda as one synchronised transaction*/  
});

So much for the context – here’s the implementation:

namespace musingstudio
{
  template<typename T>
  class Sequential
  {
    mutable T m_t;
    mutable std::mutex m_mutex;
  public:
    Sequential( T t ) : m_t( t )
    {
    }
    template<typename F>
    auto execute( F f ) const -> decltype(f(m_t))
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      return f(m_t);
    }
  };
}

And here’s the code in action, using Sequential<ostream&> to synchronise calls to std::cout:

void testSequential()
{
  musingstudio::Sequential<std::ostream&> sync_cout( std::cout );
  auto doPush = [&]() 
  {
    for ( int i = 0; i < 10; ++i )
    {
      sync_cout.execute([&](std::ostream& os)
      {
        os << i << i << i << i << i << "\n";
      });
    }
  };
  std::thread thread1(doPush);
  std::thread thread2(doPush);
  thread1.join();
  thread2.join();
}

Output:

SequentialOutput

Now that we’ve got SynchronizedQueue<T> and Sequential<T>, here’s Concurrent<T> which provides a way to perform a series of synchronised operations on some object in parallel with the activity on the main thread.  For example, if you need to keep a GUI thread responsive.  I love the idea of pushing a “Done” event onto the message queue in the destructor so that queued work is concluded and then Concurrent<T> returns.  This is also a very nice use for std::future and std::promise – allow the caller to keep the return value as a future, but don’t block until it’s needed.

template<typename T>
class Concurrent
{
  mutable T m_t;
  mutable SynchronizedQueue<std::function<void()>> m_queue;
  bool m_done;
  std::thread m_worker;
  // Assign value to the promise where there's a 
  // non-trivial return type
  template<typename Ret, typename Ftn, typename T>
  void setValue( std::promise<Ret>& promise, Ftn& f, T& t ) const
  {
    promise.set_value( f(t) );
  }
  // Assign void to the promise - trivial void return type
  template<typename Ftn, typename T>
  void setValue( std::promise<void>& promise, Ftn& f, T& t ) const
  {
    f(t);
    promise.set_value();
  }
public:
  Concurrent( T t ) : m_t(t), m_done(false), 
    m_worker( [=](){ while(!this->m_done){ m_queue.pop()(); }} )
  {}
  ~Concurrent()
  {
    m_queue.push( [=]{ this->m_done = true; } );
    m_worker.join();
  }
  // In order to return a value from the operation that we 
  // process on another thread, use async, promises and futures 
  // - we can't just return the calculated value,
  // because then the caller would have to block.
  template<typename F>
  auto execute( F f ) const -> std::future<decltype(f(m_t))>
  {
    auto promise = 
      std::make_shared<std::promise<decltype(f(m_t))>>();
    auto return_value = promise->get_future();
    m_queue.push( [=]()
    { 
      try
      {
        setValue( *promise, f, m_t );
      }
      catch(...)
      { promise->set_exception( std::current_exception() ); }
    });
    return return_value;
 }
};

Here’s some code to exercise Concurrent<T>:

void testConcurrentReturningFunction()
{
  musingstudio::Concurrent<std::string> words("Concurrent<T> - ");
  std::vector<std::future<std::string>> values;
  // Set off the calculations in a worker thread, storing future return values
  for ( size_t i = 0; i < 10; ++i )
  {
    values.push_back( 
      words.execute( [=]( std::string& in )
      {
        in += std::to_string(i);
        return in;
      }) );
  }
  // Now collection the return values and display them
  std::for_each( values.begin(), values.end(), [](std::future<std::string>& f)
  {
    std::cout << f.get() << "\n";
  });
}

Output:

ConcurrentOutput

7 Comments

Filed under C++ Code

7 responses to “Concurrency with C++11

  1. Jagannath

    Nice work Steve.

  2. Alastair

    I, like you, was very interested in Herb’s talk. When I had a go at recreating some of his code (as per your Sequential class) on my compiler (Xcode V4.5.2 using Apple LLVM 4.1) it resulted in a compilation error

    mutable T t;
    ‘mutable’ cannot be applied to references

    I have just tried your code on my work compiler (VS 2012) and all seems fine.

    I am wondering which is wrong?

    [I may well also ask this on Stackoverflow as given Herb’s other talk on const and mutable I think it’s pretty relevant]

    • So you’re saying that the other compiler error’d when instantiating the template with T as a reference (e.g. ostream&)? I don’t see why that should be disallowed. Here’s a daft Counter class that increments a mutable integer stored by reference (builds and runs under VS2010):

      #include “stdafx.h”
      #include <iostream>

      class Counter
      {
      mutable int& m_i;
      public:
      Counter(int& i) : m_i(i){}
      void Increment() const { ++m_i; }
      };

      int _tmain(int argc, _TCHAR* argv[])
      {
      int count = 0;
      Counter counter(count);
      counter.Increment();
      std::cout << count << "\n";

      return 0;
      }

  3. Alastair

    @Steve Bower Yes, instantiating the template with an ostream& as per Herb’s example (and your code above) causes issues compiling with Clang.

    I tend to believe Clang more than VS2012/10 and this Stackoverflow post confirms that :

    http://stackoverflow.com/questions/8469209/why-cant-i-declare-a-reference-to-a-mutable-object-reference-cannot-be-decla

    According to the standard: [7.1.1 para 8]:
    “The mutable specifier can be applied only to names of class data members (9.2) and cannot be applied to names declared const or static, and cannot be applied to reference members.”

  4. Alastair

    I think I may be mistaken as just tried your code with LiveWorkSpace and all is ok…

    http://liveworkspace.org/code/380Vg8$2

    Feel free to delete my comments!

  5. Alastair

    That said if I remember to select “Clang 3.2” from the list of compilers I see the error (I thought I wasn’t going mad!

    Compilation finished with errors:
    source.cpp:10:15: error: ‘mutable’ cannot be applied to references
    mutable T m_t;
    ^
    source.cpp:27:43: note: in instantiation of template class ‘musingstudio::Sequential<std::basic_ostream &>’ requested here
    musingstudio::Sequential sync_cout( std::cout );
    ^
    1 error generated.

    • I think you’re right, the latest standard has the same paragraph in 7.1.1 paragraph 10: The mutable specifier can be applied only to names of class data members (9.2) and cannot be applied to
      names declared const or static, and cannot be applied to reference members.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s