[zeromq-dev] non blocking receive and api_thread_poll_rate

Martin Sustrik sustrik at fastmq.com
Tue Jul 28 14:48:00 CEST 2009


Hi Pavel,

> I have multithreaded application, powered by 0MQ. One of threads has 
> following algorithm:
> 
> while (true) {
>    sleep one second;
>    int queue_id=api_thread->receive(my_message,false); // use non 
> blocking receive()
>    if (queue_id!=0) break; // break loop, if message was received.
> }
> 
> All works, but... time between sending message from another thread and 
> breaking loop in this thread is above minute! I looked at zeromq sources 
> and see following code in receive function:
> 
>     if (++ ticks == api_thread_poll_rate) {
>         ypollset_t::integer_t signals = pollset.check ();
>         if (signals)
>             process_commands (signals);
>         ticks = 0;
>     }
> 
> additionally i found, that api_thread_poll_rate is equal to 100. So, 
> time between calls of process_commands() for my case (because of sleep) 
> is about 100 seconds.
> 
> After that, i tried to call "api_thread->flush()" before 
> "api_thread->receive()" and this big delay between message sending and 
> receiving disappears! This is because flush() function explicitly call 
> process_command().
> 
> How i can solve this problem in more elegant way than calling flush() ?

Right. There's a problem in the code. We've never tested this scenario 
(and it seems that noone else have tried it either).

The algorithms with 'ticks' is used to avoid excessive polling for 
inter-thread commands in scenarios where the messages are being passed 
at the full speed. In that case, checking for inter-thread commands is 
done each 100 messages rather than after each message, which improves 
the performance quite a lot.

However, this is not your case. The messages in your example are sparse 
and checking for commands should be done immediately.

The problem is in zmq::api_thread_t::receive IMO:

         if (!block_)
             break;

In other words. If non-blocking receive is done and there's no message 
immediately available, exit the function are report that no message was 
received.

However, there may be messages waiting in other threads, but you won't 
know unless you process commands from other threads (one of the commands 
is actually notification that there are messages waiting for you).

My suggestion would be to modify the code so that in case of 
non-blocking receive it tries to get a message (no change) and when not 
successful, it processes commands and tries to get the message once 
more. If unsuccessfull, it'll return zero (no messages available).

The pseudocode would be something like:

bool second_pass = false;

...

//  We've got the message.
if (retrieved)
     break;

//  No message and we've processed the commands already.
//  Thus no messages are waiting in other threads.
if (second_pass)
     break;

if (!block_ && !second_pass) {

     //  No message, but we haven't processed commands so far...
     ypollset_t::integer_t signals = pollset.check ();
     if (signals) {
         process_commands (signals);
         ticks = 0;

         //  Ok. Commands are processed. Let's try to get
         //  the message once again.
         second_pass = true;
         continue;
     }

     //  There's definitely no message.
     return 0;
}

Let us know whether this kind of fix helps.

Martin



More information about the zeromq-dev mailing list