[zeromq-dev] Futures
Matt Weinstein
matt_weinstein at yahoo.com
Wed Aug 4 23:38:38 CEST 2010
Isn't this just ZMQ_NOW and ZMQ_LATER ? :-)
Easy to do with the current model, and if you could leverage iostreams::
zmq_future_stream compute_this(functoid);
compute_this << args << endm;
.... sometime later ....
compute_this >> result1 >> result2;
On Aug 4, 2010, at 5:26 PM, Oliver Smith wrote:
> Something that might excite a lot of developers towards ZeroMQ is an
> easy-to-access Futures concept.
>
> For those who don't know, a Future is a promise to return a result
> prior
> to your need to use it.
>
> In C++ you might write something like:
>
> Future<int> i(calculationFunction, param1, param2) ;
> ... do other stuff ...
> return x + i ; // Waits for the result of
> calculationFunction to return to instantiate 'i'.
>
> In ZeroMQ I envisage a "future_t" which is a super-set of message_t
>
> socket_t socket(ZMQ_REQ) ;
> socket.bind(futureWorkerUrl) ;
>
> int i ;
> #ifdef VARIANT1
> future_t future(
> calculationFunction //!< Function to invoke.
> , ¶ms //!< parameter data
> , sizeof(params) //!< size of param data.
> , &i //!< destination
> , sizeof(i)
> ) ;
> socket.send(future, 0) ;
> #else
> future_t future(
> socket //!< So it can track
> itself.
> , calculationFunction//!< Function to invoke.
> , ¶ms //!< parameter data.
> , sizeof(params) //!< size of param data.
> , &i //!< destination.
> , sizeof(i)
> ) ;
> future.send() ;
> #endif
>
> x = someOtherCalculation() ;
> future.recv() ;
> return x + i ;
>
> This would lend itself to templates, so that C++/C#/Java etc could
> declare implicit futures:
>
> template<typename _ResultType>
> class Future
> {
> public:
> Future(zmq::socket_t& socket_
> , zmq::future_func_t function_, const void* params_,
> size_t paramsSize_)
> : future(socket_, function_, params_, paramsSize_, &result,
> sizeof(result))
> {
> }
>
> public:
> operator _ResultType & ()
> {
> future.recv() ; // Force data to be received.
> return result ;
> }
>
> private:
> zmq::future_t future ;
> _ResultType result ;
> } ;
>
> int someFunction()
> {
> struct ComplexResult { int i ; float j ; char
> description[64] ; } ;
> struct ComplexArg { size_t count, depth ; double seed ; } ;
>
> ComplexArg args = { 99, 100, 1.234 } ;
> Future<ComplexResult> complexFuture(socket, &args, sizeof(args)) ;
>
> int x = ...
> /* lots of other processing here */
>
> // De-referencing
> ComplexResult& result = complexFuture ;
> printf("got result: %s\n", result.description) ;
> return x + (int)(result.i * result.j) ;
> }
>
> But there's a gotcha, especially if you are using multiple Futures
> concurrently or in the same function:
>
> 1. You either have to create a socket-per future, which means
> additional
> tear up/down time, or;
> 2. You have to manually enforce the order in which you consume the
> results.
>
> Based on my weekend experience with trying to implement parallel sort,
> it seems to me that there needs to be a mechanism for tokenizing
> messages so that a/ a random call to recv() can associate the
> resulting
> data with something meaningful or b/ you can retrieve a specific
> message
> from the queue.
>
> Then the future_t could be implemented in terms of an object which
> does
> something like:
>
> struct future_t
> {
> zmq::socket_t socket ;
> zmq::token_t token ;
> void* data ;
> size_t dataSize ;
>
> future_t(zmq::socket_t& socket_, zmq::future_func_t func_, const
> void* params_, size_t paramSize_, void* data_, size_t dataSize_)
> : socket(socket_), data(data_), dataSize(dataSize_)
> {
> socket.getsockopt(ZMQ_GET_NEXT_SEND_TOKEN, &token) ;
> zmq::message_t msg(params_, paramsSize_, 0) ;
> socket.send(msg, 0) ;
> }
>
> bool recv()
> {
> socket.setsockopt(ZMQ_SET_NEXT_RECV_TOKEN, &token) ;
> zmq::message_t msg(data, dataSize, 0) ;
> return socket.recv(&msg, 0) ;
> }
> }
>
> Any thoughts?
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
More information about the zeromq-dev
mailing list