[zeromq-dev] Futures

Matt Weinstein matt_weinstein at yahoo.com
Wed Aug 4 23:38:38 CEST 2010


Isn't this just ZMQ_NOW and ZMQ_LATER ? :-)

Easy to do with the current model, and if you could leverage iostreams::

zmq_future_stream compute_this(functoid);

compute_this << args << endm;

.... sometime later ....

compute_this >> result1 >> result2;




On Aug 4, 2010, at 5:26 PM, Oliver Smith wrote:

> Something that might excite a lot of developers towards ZeroMQ is an
> easy-to-access Futures concept.
>
> For those who don't know, a Future is a promise to return a result  
> prior
> to your need to use it.
>
> In C++ you might write something like:
>
>     Future<int> i(calculationFunction, param1, param2) ;
>     ... do other stuff ...
>     return x + i ;        // Waits for the result of
> calculationFunction to return to instantiate 'i'.
>
> In ZeroMQ I envisage a "future_t" which is a super-set of message_t
>
>     socket_t socket(ZMQ_REQ) ;
>     socket.bind(futureWorkerUrl) ;
>
>     int i ;
> #ifdef VARIANT1
>     future_t future(
>                     calculationFunction  //!< Function to invoke.
>                     , &params               //!< parameter data
>                     , sizeof(params)       //!< size of param data.
>                     , &i                         //!< destination
>                     , sizeof(i)
>                     ) ;
>     socket.send(future, 0) ;
> #else
>     future_t future(
>                       socket                   //!< So it can track  
> itself.
>                     , calculationFunction//!< Function to invoke.
>                     , &params               //!< parameter data.
>                     , sizeof(params)       //!< size of param data.
>                     , &i                         //!< destination.
>                     , sizeof(i)
>                     ) ;
>     future.send() ;
> #endif
>
>     x = someOtherCalculation() ;
>     future.recv() ;
>     return x + i ;
>
> This would lend itself to templates, so that C++/C#/Java etc could
> declare implicit futures:
>
> template<typename _ResultType>
> class Future
> {
> public:
>     Future(zmq::socket_t& socket_
>               , zmq::future_func_t function_, const void* params_,
> size_t paramsSize_)
>         : future(socket_, function_, params_, paramsSize_, &result,
> sizeof(result))
>         {
>         }
>
> public:
>     operator _ResultType & ()
>     {
>         future.recv() ;            // Force data to be received.
>         return result ;
>     }
>
> private:
>     zmq::future_t future ;
>     _ResultType result ;
> } ;
>
> int someFunction()
> {
>     struct ComplexResult { int i ; float j ; char  
> description[64] ; } ;
>     struct ComplexArg { size_t count, depth ; double seed ; } ;
>
>     ComplexArg args = { 99, 100, 1.234 } ;
>     Future<ComplexResult> complexFuture(socket, &args, sizeof(args)) ;
>
>     int x = ...
>     /* lots of other processing here */
>
>     // De-referencing
>     ComplexResult& result = complexFuture ;
>     printf("got result: %s\n", result.description) ;
>     return x + (int)(result.i * result.j) ;
> }
>
> But there's a gotcha, especially if you are using multiple Futures
> concurrently or in the same function:
>
> 1. You either have to create a socket-per future, which means  
> additional
> tear up/down time, or;
> 2. You have to manually enforce the order in which you consume the  
> results.
>
> Based on my weekend experience with trying to implement parallel sort,
> it seems to me that there needs to be a mechanism for tokenizing
> messages so that a/ a random call to recv() can associate the  
> resulting
> data with something meaningful or b/ you can retrieve a specific  
> message
> from the queue.
>
> Then the future_t could be implemented in terms of an object which  
> does
> something like:
>
> struct future_t
> {
>     zmq::socket_t socket ;
>     zmq::token_t token ;
>     void* data ;
>     size_t dataSize ;
>
>     future_t(zmq::socket_t& socket_, zmq::future_func_t func_, const
> void* params_, size_t paramSize_, void* data_, size_t dataSize_)
>         : socket(socket_), data(data_), dataSize(dataSize_)
>         {
>             socket.getsockopt(ZMQ_GET_NEXT_SEND_TOKEN, &token) ;
>             zmq::message_t msg(params_, paramsSize_, 0) ;
>             socket.send(msg, 0) ;
>          }
>
>     bool recv()
>     {
>         socket.setsockopt(ZMQ_SET_NEXT_RECV_TOKEN, &token) ;
>         zmq::message_t msg(data, dataSize, 0) ;
>         return socket.recv(&msg, 0) ;
>     }
> }
>
> Any thoughts?
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev




More information about the zeromq-dev mailing list