[zeromq-dev] zmq_poll: timeout issue
Matt Weinstein
matt_weinstein at yahoo.com
Tue Aug 10 19:35:06 CEST 2010
Basically -- reverse the normal "server" paradigm, and use an XREP
socket on the device side, then have each server send a single message
when it wakes up on its REQ socket. This message identifies the
server to the device side by UUID. The server will be back to
alternative recv-send cycles (just offset by one).
[[ clients ]] -- [REQ] = [XREP] --- DEVICE --- [XREP] = [REQ] --
[[ servers ]]
The server UUIDs go into a "server free list", which can then be used
to schedule clients.
When you read the client packet stream (UUID,NULLPACKET,data) to
service a client, you need to keep the <serverUUID, clientUUID> pair
in a(n unordered) map, so when the server finishes the client response
can be transmitted, and the server pushed back to the freelist.
If there is no server when you receive a client request
( server_free_list.empty() ) you can then respond back immediately
with a failure message.
Alternatively, if you don't want to dequeue requests until a server is
free, just poll ONLY the server list while server_free_list.empty().
If you need to handle a timeout scenario, either pull the client
requests out of the XREP, put them into unrolled lists of zmq_msg_t or
equivalent, and build a small timer chain to return an error and kill
a set (or unordered_map) with the requests.
(The zmq_reactor library should help a bit here, that's what I'm using
for all this...)
Best,
Matt
On Aug 10, 2010, at 12:46 PM, Ilja Golshtein wrote:
> Martin, Matt,
>
> thank you very much for clarification
> and for speedy answering.
>
> To be honest I don't fill why spurious wakeups are unavoidable in
> 0mq ... though it doesn't matter.
>
> Here is a mockup of client application
>
> ==
> // One I/O thread in the thread pool will do.
> zmq::context_t ctx (1);
> std::auto_ptr<zmq::socket_t> sp;
> sp.reset(new zmq::socket_t(ctx, ZMQ_REQ/*ZMQ_XREQ*/));
>
> // Connect to the server.
> sp->connect ("tcp://localhost:23001");
>
> for (int i = 0; i != 20; i++)
> {
> zmq::message_t request (10);
> strcpy((char*)request.data (), "AAAAAAAAA");
> sp->send (request);
> std::cout << "sent" << std::endl;
>
>
> zmq_pollitem_t items[1];
> items[0].socket = *sp.get();;
> items[0].events = ZMQ_POLLIN;
>
> struct timespec tp1;
> ::clock_gettime(CLOCK_MONOTONIC, &tp1);
> for (;;)
> {
> int poll_ret = zmq::poll(items, 1, 3000000);
> if (items[0].revents == ZMQ_POLLIN)
> {
> assert(poll_ret > 0);
>
> // Get the reply.
> zmq::message_t reply;
> sp->recv(&reply);
> std::cout << (char*)reply.data() << std::endl;
> break;
> }
> else
> {
> std::cout << "no event fired" <<
> "poll_ret=" << poll_ret << std::endl;
> struct timespec tp2;
> ::clock_gettime(CLOCK_MONOTONIC, &tp2);
> if (tp2.tv_sec - tp1.tv_sec < 3)
> {
> continue;
> }
> else
> {
> std::cout << "reconnecting - message lost" << std::endl;
> sp.reset(new zmq::socket_t(ctx, ZMQ_REQ));
> sp->connect ("tcp://localhost:23001");
> break;
> }
> }
> }
> }
> ==
>
> The goals are
> 1. Don't got stuck if response is lost
> 2. Not block if server unavailable.
>
> The only reason of recreating socket is REQ/REP paradigm (perhaps
> XREQ/XREP are more suitable).
>
> Suggestions are appreciated.
>
> Thanks.
>
>
> 10.08.2010, 18:07, "Matt Weinstein" <matt_weinstein at yahoo.com>:
>> This is the same problem as pthread condition variables have, e.g.:
>>
>> http://www.justsoftwaresolutions.co.uk/threading/condition-variable-spurious-wakes.html
>>
>> Just think of the revents as condition variables to check...
>>
>> Does someone want to write a quick wrapper ?
>>
>> PS I recommend CLOCK_MONOTONIC, but it may not be available, and
>> requires -lrt ...
>>
> --
> Best regards
> Ilja Golshtein
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
More information about the zeromq-dev
mailing list