[zeromq-dev] Futures

Oliver Smith oliver at kfs.org
Wed Aug 4 23:26:22 CEST 2010


Something that might excite a lot of developers towards ZeroMQ is an 
easy-to-access Futures concept.

For those who don't know, a Future is a promise to return a result prior 
to your need to use it.

In C++ you might write something like:

     Future<int> i(calculationFunction, param1, param2) ;
     ... do other stuff ...
     return x + i ;        // Waits for the result of 
calculationFunction to return to instantiate 'i'.

In ZeroMQ I envisage a "future_t" which is a super-set of message_t

     socket_t socket(ZMQ_REQ) ;
     socket.bind(futureWorkerUrl) ;

     int i ;
#ifdef VARIANT1
     future_t future(
                     calculationFunction  //!< Function to invoke.
                     , &params               //!< parameter data
                     , sizeof(params)       //!< size of param data.
                     , &i                         //!< destination
                     , sizeof(i)
                     ) ;
     socket.send(future, 0) ;
#else
     future_t future(
                       socket                   //!< So it can track itself.
                     , calculationFunction//!< Function to invoke.
                     , &params               //!< parameter data.
                     , sizeof(params)       //!< size of param data.
                     , &i                         //!< destination.
                     , sizeof(i)
                     ) ;
     future.send() ;
#endif

     x = someOtherCalculation() ;
     future.recv() ;
     return x + i ;

This would lend itself to templates, so that C++/C#/Java etc could 
declare implicit futures:

template<typename _ResultType>
class Future
{
public:
     Future(zmq::socket_t& socket_
               , zmq::future_func_t function_, const void* params_, 
size_t paramsSize_)
         : future(socket_, function_, params_, paramsSize_, &result, 
sizeof(result))
         {
         }

public:
     operator _ResultType & ()
     {
         future.recv() ;            // Force data to be received.
         return result ;
     }

private:
     zmq::future_t future ;
     _ResultType result ;
} ;

int someFunction()
{
     struct ComplexResult { int i ; float j ; char description[64] ; } ;
     struct ComplexArg { size_t count, depth ; double seed ; } ;

     ComplexArg args = { 99, 100, 1.234 } ;
     Future<ComplexResult> complexFuture(socket, &args, sizeof(args)) ;

     int x = ...
     /* lots of other processing here */

     // De-referencing
     ComplexResult& result = complexFuture ;
     printf("got result: %s\n", result.description) ;
     return x + (int)(result.i * result.j) ;
}

But there's a gotcha, especially if you are using multiple Futures 
concurrently or in the same function:

1. You either have to create a socket-per future, which means additional 
tear up/down time, or;
2. You have to manually enforce the order in which you consume the results.

Based on my weekend experience with trying to implement parallel sort, 
it seems to me that there needs to be a mechanism for tokenizing 
messages so that a/ a random call to recv() can associate the resulting 
data with something meaningful or b/ you can retrieve a specific message 
from the queue.

Then the future_t could be implemented in terms of an object which does 
something like:

struct future_t
{
     zmq::socket_t socket ;
     zmq::token_t token ;
     void* data ;
     size_t dataSize ;

     future_t(zmq::socket_t& socket_, zmq::future_func_t func_, const 
void* params_, size_t paramSize_, void* data_, size_t dataSize_)
         : socket(socket_), data(data_), dataSize(dataSize_)
         {
             socket.getsockopt(ZMQ_GET_NEXT_SEND_TOKEN, &token) ;
             zmq::message_t msg(params_, paramsSize_, 0) ;
             socket.send(msg, 0) ;
          }

     bool recv()
     {
         socket.setsockopt(ZMQ_SET_NEXT_RECV_TOKEN, &token) ;
         zmq::message_t msg(data, dataSize, 0) ;
         return socket.recv(&msg, 0) ;
     }
}

Any thoughts?




More information about the zeromq-dev mailing list