[zeromq-dev] non blocking receive and api_thread_poll_rate
Jon Dyte
jon at totient.co.uk
Tue Jul 28 20:16:40 CEST 2009
Hi
I'm very glad I've just picked up this thread, because I was considering
coding up an api thread with an almost identical while loop around a
non -blocking read (actually it needs to oscillate between blocking and non-
blocking as it actually running some timers)
However I did wonder whether there would be any interest in making the
recieve call take an additional argument for a time out so that the
user would not have to code up the loop.
Thoughts?
Jon
On Tuesday 28 Jul 2009, Martin Sustrik wrote:
> Hi Pavel,
>
> > I have multithreaded application, powered by 0MQ. One of threads has
> > following algorithm:
> >
> > while (true) {
> > sleep one second;
> > int queue_id=api_thread->receive(my_message,false); // use non
> > blocking receive()
> > if (queue_id!=0) break; // break loop, if message was received.
> > }
> >
> > All works, but... time between sending message from another thread and
> > breaking loop in this thread is above minute! I looked at zeromq sources
> > and see following code in receive function:
> >
> > if (++ ticks == api_thread_poll_rate) {
> > ypollset_t::integer_t signals = pollset.check ();
> > if (signals)
> > process_commands (signals);
> > ticks = 0;
> > }
> >
> > additionally i found, that api_thread_poll_rate is equal to 100. So,
> > time between calls of process_commands() for my case (because of sleep)
> > is about 100 seconds.
> >
> > After that, i tried to call "api_thread->flush()" before
> > "api_thread->receive()" and this big delay between message sending and
> > receiving disappears! This is because flush() function explicitly call
> > process_command().
> >
> > How i can solve this problem in more elegant way than calling flush() ?
>
> Right. There's a problem in the code. We've never tested this scenario
> (and it seems that noone else have tried it either).
>
> The algorithms with 'ticks' is used to avoid excessive polling for
> inter-thread commands in scenarios where the messages are being passed
> at the full speed. In that case, checking for inter-thread commands is
> done each 100 messages rather than after each message, which improves
> the performance quite a lot.
>
> However, this is not your case. The messages in your example are sparse
> and checking for commands should be done immediately.
>
> The problem is in zmq::api_thread_t::receive IMO:
>
> if (!block_)
> break;
>
> In other words. If non-blocking receive is done and there's no message
> immediately available, exit the function are report that no message was
> received.
>
> However, there may be messages waiting in other threads, but you won't
> know unless you process commands from other threads (one of the commands
> is actually notification that there are messages waiting for you).
>
> My suggestion would be to modify the code so that in case of
> non-blocking receive it tries to get a message (no change) and when not
> successful, it processes commands and tries to get the message once
> more. If unsuccessfull, it'll return zero (no messages available).
>
> The pseudocode would be something like:
>
> bool second_pass = false;
>
> ...
>
> // We've got the message.
> if (retrieved)
> break;
>
> // No message and we've processed the commands already.
> // Thus no messages are waiting in other threads.
> if (second_pass)
> break;
>
> if (!block_ && !second_pass) {
>
> // No message, but we haven't processed commands so far...
> ypollset_t::integer_t signals = pollset.check ();
> if (signals) {
> process_commands (signals);
> ticks = 0;
>
> // Ok. Commands are processed. Let's try to get
> // the message once again.
> second_pass = true;
> continue;
> }
>
> // There's definitely no message.
> return 0;
> }
>
> Let us know whether this kind of fix helps.
>
> Martin
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
More information about the zeromq-dev
mailing list