[zeromq-dev] Futures
Oliver Smith
oliver at kfs.org
Wed Aug 4 23:26:22 CEST 2010
Something that might excite a lot of developers towards ZeroMQ is an
easy-to-access Futures concept.
For those who don't know, a Future is a promise to return a result prior
to your need to use it.
In C++ you might write something like:
Future<int> i(calculationFunction, param1, param2) ;
... do other stuff ...
return x + i ; // Waits for the result of
calculationFunction to return to instantiate 'i'.
In ZeroMQ I envisage a "future_t" which is a super-set of message_t
socket_t socket(ZMQ_REQ) ;
socket.bind(futureWorkerUrl) ;
int i ;
#ifdef VARIANT1
future_t future(
calculationFunction //!< Function to invoke.
, ¶ms //!< parameter data
, sizeof(params) //!< size of param data.
, &i //!< destination
, sizeof(i)
) ;
socket.send(future, 0) ;
#else
future_t future(
socket //!< So it can track itself.
, calculationFunction//!< Function to invoke.
, ¶ms //!< parameter data.
, sizeof(params) //!< size of param data.
, &i //!< destination.
, sizeof(i)
) ;
future.send() ;
#endif
x = someOtherCalculation() ;
future.recv() ;
return x + i ;
This would lend itself to templates, so that C++/C#/Java etc could
declare implicit futures:
template<typename _ResultType>
class Future
{
public:
Future(zmq::socket_t& socket_
, zmq::future_func_t function_, const void* params_,
size_t paramsSize_)
: future(socket_, function_, params_, paramsSize_, &result,
sizeof(result))
{
}
public:
operator _ResultType & ()
{
future.recv() ; // Force data to be received.
return result ;
}
private:
zmq::future_t future ;
_ResultType result ;
} ;
int someFunction()
{
struct ComplexResult { int i ; float j ; char description[64] ; } ;
struct ComplexArg { size_t count, depth ; double seed ; } ;
ComplexArg args = { 99, 100, 1.234 } ;
Future<ComplexResult> complexFuture(socket, &args, sizeof(args)) ;
int x = ...
/* lots of other processing here */
// De-referencing
ComplexResult& result = complexFuture ;
printf("got result: %s\n", result.description) ;
return x + (int)(result.i * result.j) ;
}
But there's a gotcha, especially if you are using multiple Futures
concurrently or in the same function:
1. You either have to create a socket-per future, which means additional
tear up/down time, or;
2. You have to manually enforce the order in which you consume the results.
Based on my weekend experience with trying to implement parallel sort,
it seems to me that there needs to be a mechanism for tokenizing
messages so that a/ a random call to recv() can associate the resulting
data with something meaningful or b/ you can retrieve a specific message
from the queue.
Then the future_t could be implemented in terms of an object which does
something like:
struct future_t
{
zmq::socket_t socket ;
zmq::token_t token ;
void* data ;
size_t dataSize ;
future_t(zmq::socket_t& socket_, zmq::future_func_t func_, const
void* params_, size_t paramSize_, void* data_, size_t dataSize_)
: socket(socket_), data(data_), dataSize(dataSize_)
{
socket.getsockopt(ZMQ_GET_NEXT_SEND_TOKEN, &token) ;
zmq::message_t msg(params_, paramsSize_, 0) ;
socket.send(msg, 0) ;
}
bool recv()
{
socket.setsockopt(ZMQ_SET_NEXT_RECV_TOKEN, &token) ;
zmq::message_t msg(data, dataSize, 0) ;
return socket.recv(&msg, 0) ;
}
}
Any thoughts?
More information about the zeromq-dev
mailing list