[zeromq-dev] Async::Worker, C++ task offloading.

Oliver Smith oliver at kfs.org
Thu Jul 22 06:20:40 CEST 2010


I discovered ZeroMQ through a friend after being dissapointed that 
Intel's "Threading Building Blocks" didn't mean "threads" in the context 
that you or I think, but is rather a "get every last ounce out of your 
Intel CPU with hyper parallelism" system.

What I was looking for was a convenient C++ way of creating 
work-splitting tasks and work-delegating tasks. Having cut my teeth on 
ZeroMQ I immediately to making a worker pool system. The result is 
Async::Worker.

Download: http://www.kfs.org/async/worker/
Documentation: http://www.kfs.org/async/worker/manual/

It combines concepts from OpenMP and Intel's TBB.

However: an important point of note with this implementation. The 
messages I pass are pointers to objects rather than objects themselves. 
It is not intended, as it stands, for inter-process or inter-machine 
communications.

The main classes are:
  Async::FireAndForget, a base for workloads that can be executed by a 
worker and destroyed, such as writing to a log file, etc.
  Async::RunAndReturn, a base class for workloads that do reductions or 
need completion back in the master thread.

RunAndReturn is somewhat along the lines of #pragma omp task. The Work() 
member is executed by an available worker thread and the pointer is sent 
back to the master thread, which executes the Result() member when you 
get around to calling Async::GetResults().

So, consider the following:

typedef std::vector<int>  Numbers ;

struct NumberCruncher : public Async::RunAndReturn
{
   NumberCruncher(Numbers::iterator startRange, Numbers::iterator endRange, int* finalResultHere)
    : mStartRange(startRange), mEndRange(endRange), mFinalResultHere(finalResultHere), mPrivateSum(0) {}

   virtual void Work() const
   {
     // This gets executed in parallel by a worker thread
     for ( auto it = mStartRange ; it != mEndRange ; ++it )
       *mPrivateSum += *it ;
   }

   virtual void Result()
   {
     // This gets executed in serial by whomever calls Async::GetResults()
     *mFinalResultHere += mPrivateSum ;
   }
} ;

int main(int argc, char* argv[])
{
   Numbers numbers(200000000) ;
   for ( int i = 0 ; i<  numbers.size() ; ++i )
     numbers[i] = rand();

   int total = 0 ;
   Numbers::iterator endChunk = numbers.begin() ;
   do
   {
     // Each task we dispatch will be to add a block
     // of upto 8192 numbers.
     Numbers::iterator startChunk = endChunk ;
     endChunk = std::min(numbers.end(), it + 8192) ;

     Async::Queue(new NumberCruncher(startChunk, endChunk,&total)) ;
   }
   while ( endChunk != numbers.end() ) ;

   // Start receiving and adding the results.
   Async::GetResults() ;

   printf("Sum of our %u numbers = %d\n", numbers.size(), total) ;
}

This is a somewhat weak example because the work being done by the 
worker is so trivial, but even so on a virtual quad-core machine 
building with -O0 I see a 35-40% reduction in processing time.

I ran a second test with a ridiculously complicated formula involving 
int-to-float, float-to-double, double-to-bool, sin, cos, abs, modulo etc 
operations, and benchmarked running the array in serial vs parallel.

Serial: 6784 miliseconds
Paralle: 1401 miliseconds.

And that's without any effort to take things like cache alignment, cache 
lines etc into account, or any effort to tweak the number of tasks per 
workload.

I hope some of you will find this useful.

If you want to provide feedback or questions, either follow up here, to 
my email address or on the blog post linked from the manual page.

Happy threading :)


- Oliver




More information about the zeromq-dev mailing list