Tag Archives: Concurrency

InterlockedIncrement

Raymond Chen blogged about the internals of InterlockedIncrement and InterlockedCompareExchange.

Leave a comment

Filed under C++, Programming

C++: Race conditions and Data races

According to this discussion on Stack Overflow, we should make a distinction between race conditions and data races. This blog post defines the terms and gives a neat example where the code is modified to exhibit zero, one or both behaviours.

Here’s an example of a race condition – the ordering of the execution affects the outcome (this can occur if one thread checks a value then performs an action, giving another thread the change to mutate the variable in between):

if (x == 5) // The "Check"
{
   y = x * 2; // The "Act"

   // If another thread changed x in between "if (x == 5)" and "y = x * 2" above,
   // y will not be equal to 10.
}

And here’s an example of a data race – a variable is mutated by multiple threads without synchronisation:

// Execute this function on five threads simultaneously
for ( int i = 0; i < 10000; i++ )
{
   x = x + 1; 
}

In both cases, use of a synchronisation object across all the threads involved (e.g. a mutex) would address the issue.

Leave a comment

Filed under C++, C++ Code, Programming

C++: Resumable functions, Async and Await

Interesting post from Jens Weller on the new concurrency features in C++ 11 and C++ 14. He gives examples of code written using standard library async, futures and “.then()” with lamdbas, compared to the simplicity of expressing the same functionality with the proposed resumable functions (that is, resumable and await as language keywords).

Whilst resumable and await won’t be part of the language until at least C++17, Weller points out that Microsoft may put implementations into Visual Studio well before that. Here’s the paper, N3722.

Leave a comment

Filed under C++, Programming

Beautiful Concurrency – Simon Peyton-Jones

This is a famous paper on concurrency and the benefits of functional programming. It’s obviously no coincidence that I’ve heard of the author in connection with F#.

For me, a beautiful program is one that is so simple and elegant that it obviously has no mistakes, rather than merely having no obvious mistakes

Simon claims that Haskell is the most beautiful language he knows, then introduces it via an example with side-effects, knowing that the result is quite ghastly to a newbie! He points out that

Being explicit about side effects reveals a good deal of useful information. Consider two functions:
f :: Int -> Int
g :: Int -> IO Int
From looking only at their types we can see that f is a pure function: it has no side effects. In contrast, g has side effects, and this is apparent in its type. Each time g is performed it may give a different result.

I got lost in the middle of the Santa example, but I get the sentiment behind the design:

Since there are no programmer-visible locks, the questions of which locks to take, and in which order, simply do not arise. In particular, blocking (retry) and choice (orElse), which are fundamentally non-modular when expressed using locks, are fully modular in STM.

Leave a comment

Filed under Programming

Introduction to C++11 Concurrency

Marius Bancila wrote a neat introduction to C++11 concurrency.

Leave a comment

Filed under C++

Concurrency with C++11

Having watched Herb Sutter’s C++ Concurrency video, I wanted to try out a few of the techniques for myself. The first step was to write a simple synchronised queue, which he left as an exercise for the reader.  The key feature is that pop() blocks until an element is pushed into the queue – then it returns the element.  This turns out to be pretty succinct using C++11 features like std::mutex and std::condition_variable:

namespace musingstudio
{
  template<typename T>
  class SynchronizedQueue
  {
    std::deque<T> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_wait_for_non_empty;

  public:
    // When an element of T is pushed onto the queue,
    // one caller waiting in a pop() call will be notified
    void push( const T& t )
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      m_queue.push_back(t);
      m_wait_for_non_empty.notify_one();
    }

    // Calls to pop() will block until an element of T is 
    // pushed onto the queue
    T pop()
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      while(m_queue.empty())
      {
        m_wait_for_non_empty.wait(lock);
      }
      T tmp(m_queue.front());
      m_queue.pop_front();
      return tmp;
    }
  };
}

and here’s some code to exercise it:

void testConcurrentQueue()
{
  musingstudio::SynchronizedQueue<int> elements;

  std::thread pusher([&]()
  {
    for (int i = 0; i < 5; ++i)
    {
      wait();
      std::cout << "Pushing " << i << '\n';
      elements.push(i);
    }
  });

  std::thread popper([&]()
  {
    for (int j = 0; j < 5; ++j )
    {
      int popped = elements.pop();
      std::cout << "Popped " << popped << "\n";
    }
  });

  pusher.join();
  popper.join();
}

Output:

SynchronizedQueueOutput

The next item that caught my eye was a template class that wraps an instance of T so that access to it becomes transactional across threads. No need to explicitly take a lock for each group of calls to the object – instead, you express each transaction on on the instance of T as a lambda. A mutex blocks and the command (expressed as a lambda) is executed in the calling thread.  Herb called his example Monitor<T>, but I preferred Sequential<T> as a partner to Concurrent<T> (see below).  Also, I replaced operator() with excute() (in my opinion it’s easier to read).  Typical use looks like this:

Sequential<T> t( ... ); 
t.execute([&](T& u)
{  
  /* perform multiple operations in this lambda as one synchronised transaction*/  
});

So much for the context – here’s the implementation:

namespace musingstudio
{
  template<typename T>
  class Sequential
  {
    mutable T m_t;
    mutable std::mutex m_mutex;
  public:
    Sequential( T t ) : m_t( t )
    {
    }
    template<typename F>
    auto execute( F f ) const -> decltype(f(m_t))
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      return f(m_t);
    }
  };
}

And here’s the code in action, using Sequential<ostream&> to synchronise calls to std::cout:

void testSequential()
{
  musingstudio::Sequential<std::ostream&> sync_cout( std::cout );
  auto doPush = [&]() 
  {
    for ( int i = 0; i < 10; ++i )
    {
      sync_cout.execute([&](std::ostream& os)
      {
        os << i << i << i << i << i << "\n";
      });
    }
  };
  std::thread thread1(doPush);
  std::thread thread2(doPush);
  thread1.join();
  thread2.join();
}

Output:

SequentialOutput

Now that we’ve got SynchronizedQueue<T> and Sequential<T>, here’s Concurrent<T> which provides a way to perform a series of synchronised operations on some object in parallel with the activity on the main thread.  For example, if you need to keep a GUI thread responsive.  I love the idea of pushing a “Done” event onto the message queue in the destructor so that queued work is concluded and then Concurrent<T> returns.  This is also a very nice use for std::future and std::promise – allow the caller to keep the return value as a future, but don’t block until it’s needed.

template<typename T>
class Concurrent
{
  mutable T m_t;
  mutable SynchronizedQueue<std::function<void()>> m_queue;
  bool m_done;
  std::thread m_worker;
  // Assign value to the promise where there's a 
  // non-trivial return type
  template<typename Ret, typename Ftn, typename T>
  void setValue( std::promise<Ret>& promise, Ftn& f, T& t ) const
  {
    promise.set_value( f(t) );
  }
  // Assign void to the promise - trivial void return type
  template<typename Ftn, typename T>
  void setValue( std::promise<void>& promise, Ftn& f, T& t ) const
  {
    f(t);
    promise.set_value();
  }
public:
  Concurrent( T t ) : m_t(t), m_done(false), 
    m_worker( [=](){ while(!this->m_done){ m_queue.pop()(); }} )
  {}
  ~Concurrent()
  {
    m_queue.push( [=]{ this->m_done = true; } );
    m_worker.join();
  }
  // In order to return a value from the operation that we 
  // process on another thread, use async, promises and futures 
  // - we can't just return the calculated value,
  // because then the caller would have to block.
  template<typename F>
  auto execute( F f ) const -> std::future<decltype(f(m_t))>
  {
    auto promise = 
      std::make_shared<std::promise<decltype(f(m_t))>>();
    auto return_value = promise->get_future();
    m_queue.push( [=]()
    { 
      try
      {
        setValue( *promise, f, m_t );
      }
      catch(...)
      { promise->set_exception( std::current_exception() ); }
    });
    return return_value;
 }
};

Here’s some code to exercise Concurrent<T>:

void testConcurrentReturningFunction()
{
  musingstudio::Concurrent<std::string> words("Concurrent<T> - ");
  std::vector<std::future<std::string>> values;
  // Set off the calculations in a worker thread, storing future return values
  for ( size_t i = 0; i < 10; ++i )
  {
    values.push_back( 
      words.execute( [=]( std::string& in )
      {
        in += std::to_string(i);
        return in;
      }) );
  }
  // Now collection the return values and display them
  std::for_each( values.begin(), values.end(), [](std::future<std::string>& f)
  {
    std::cout << f.get() << "\n";
  });
}

Output:

ConcurrentOutput

7 Comments

Filed under C++ Code

Herb Sutter Video – C++ Concurrency

Another excellent video from Herb Sutter, this time on C++ Concurrency. The Monitor class is based on his Wrapper pattern
20130118-133301.jpg
and allows the caller to group related calls into a transaction, with synchronisation supplied via a mutex.
20130118-133722.jpg
The Concurrent class replaces the mutex with a concurrent_queue to avoid the caller blocking whilst the tasks complete.
20130118-133609.jpg
What’s especially elegant is ~Concurrent which pushes a done function onto message queue in order to signal not to wait for any more tasks (the concurrent_queue.pop() is assumed to wait until the next task is pushed). The done data member is not required to be atomic because it is only mutated on the worker thread, never on the calling thread.

1 Comment

Filed under C++, Video