[zeromq-dev] [PATCH] socket_base_t::recv() hangs intermittently when in blocking mode under certain conditions

Martin Sustrik sustrik at 250bpm.com
Wed Nov 10 10:36:01 CET 2010

Hi Mark,

Thanks for the patch. I am currently out of the town. I'll be back on
Friday. I'll review and apply your patch then.


On 10/11/2010, "Marc Rossi" <mrossi19 at gmail.com> wrote:

>Here is code that can recreate the problem.   The first pastebin link shows
>changes that need to be made to the socket_base_t::recv() method.  Didn't
>post the entire func but it should be obvious where it fits in.  These
>changes need to be applied to library ilnked in the test apps to recreate
>the timing necessary for the bug to surface.
>The next two pastebin links are for pub.cpp and sub.cpp, respectively.
> Build and link against libzmq with the modified socket_base_t::recv()
>pub.cpp:  http://pastebin.com/SqqaBnYc
>sub.cpp:  http://pastebin.com/6s0xwtc8
>Follow the below steps:
>1. Start the pub process in a terminal -- it will create a PUB socket and
>wait for user input before publshing data.
>2. Start the sub process in a different terminal -- it will create a SUB
>socket and connect to the PUB socket from step 1.  It will  then wait for
>user input before attempting to read from the socket.
>3. Hit enter in the pub terminal.  This will publish exactly 199 messages
>and then wait for user input before publishing more messages.
>4. Hit  enter in the  sub terminal  This will receive exactly 199 messages
>before the modified socket_base.cpp code pauses and waits for user input.
> At this point the sub socket has no messages on the queue and no signals to
>be read but is about to call process_commands().
>5. Hit enter in the pub terminal to allow it to continue publishing
>messages.   The sub client now has both a message in the pipe and a revive
>signal waiting for it when it awakens.
>6. Hit enter in the sub terminal to allow it to process the newly published
>messages.  Unfortunately it will first process the revive signal then call
>process_commands() further down with block = true.  Since there are no more
>signals to be read it will block even though there are messages continuing
>to be added to the queue.
>Apply the patch previously provided and try the same test, all should work
>On Tue, Nov 9, 2010 at 2:15 PM, Marc Rossi <mrossi19 at gmail.com> wrote:
>> Main thread calls recv() and hangs forever (after working fine for a period
>> of time), memory usage grows continuously while io thread pulls data from
>> socket and pushes on the internal queue. netstat -a shows no data in recv-q
>> because io thread continues to work properly and pull data from the socket.
>> This occurs under the following scenario:
>> User code calls socket_base_t::recv() indirectly through higher level
>> zeromq API call when there are no messages waiting. Previous 99
>> (inbound_poll_rate - 1) calls to the recv() function returned an already
>> waiting message fetched by the xrecv() call at the start of the function().
>> This 100th call to recv() is as stated above has no messages waiting to be
>> read so the xrecv() call fails and rc = -1. Immediately after this call to
>> xrecv() but BEFORE the conditional statement "if (++ticks ==
>> inbound_poll_rate)" a message arrives and is processed by the io thread,
>> resulting in the generation of a revive signal as the new message is pushed
>> onto the queue. Since ++ticks is now 100 (inbound_poll_rate) the above
>> conditional is true and app_thread_t::process_commands() is called,
>> processing the revive signal.
>> Since this is a BLOCKING socket and rc != 0 we fall down to the loop at the
>> end of the recv() function that unfortunately for us calls the
>> app_thread_t::process_commands() method with block_ = true before calling
>> xrecv(). Since we already read the revive signal above we are now officially
>> hung as there is still a message in the queue and there will be no more
>> revive signals generated by the io thread because of that.
>> To test that this is indeed what is happening I did the following. Added an
>> integer reference as a third parameter to the
>> app_thread_t::process_commands() method that is set to the number of
>> commands received and processed. Immediately before AND after calling
>> process_commands() method in the final loop of socket_base_t::recv() I added
>> a deug print statement that is executed ONLY if the prior call to
>> process_commands() returned a value > 0 for the third param. After running
>> the test code for about an hour the scenario described above occurred with
>> the debug print prior to the process_commands() call being displayed and
>> then the process was hung.
>> Below is the simple patch that seems to fix the problem for me. This will
>> incur a small penalty when ticks == 0 and there are no messages waiting to
>> be read as the initial call to process_commands will return immediately due
>> to block being set to false. This could be made more efficient if the
>> process_commands() method took a 3rd param as a bool that was set to true if
>> commands were actually processed, then we would ONLY set block = false when
>> the previous call to process_commands() actually did something, not rely on
>> the ticks = 0 line in the if/then block.
>> From 8d45a82d9cf7b788a3bed5014420962ea4ca5969 Mon Sep 17 00:00:00 2001
>> From: Marc Rossi <mrossi19 at gmail.com>
>> Date: Tue, 9 Nov 2010 13:46:06 -0600
>> Subject: [PATCH] Fix socket_t::recv() hang scenario where initial call to
>> process_commands() eats signal
>> Added block boolean var to second process_commands() invocation for
>> blocking sockets
>> instead of always using true.  This prevents the process_commands() call
>> from hanging
>> when a message is received with an empty queue after the call to xrecv()
>> but
>> prior to the initial call to process_commands() invoked when ++ticks ==
>> inbound_poll_rate.
>> Signed-off-by: Marc Rossi <mrossi19 at gmail.com>
>> ---
>>  src/socket_base.cpp |    4 +++-
>>  1 files changed, 3 insertions(+), 1 deletions(-)
>> diff --git a/src/socket_base.cpp b/src/socket_base.cpp
>> index c933954..344b552 100644
>> --- a/src/socket_base.cpp
>> +++ b/src/socket_base.cpp
>> @@ -437,15 +437,17 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int
>> flags_)
>>      //  In blocking scenario, commands are processed over and over again
>> until
>>      //  we are able to fetch a message.
>> +    bool block = (ticks != 0);
>>      while (rc != 0) {
>>          if (errno != EAGAIN)
>>              return -1;
>> -        if (unlikely (!app_thread->process_commands (true, false))) {
>> +        if (unlikely (!app_thread->process_commands (block, false))) {
>>              errno = ETERM;
>>              return -1;
>>          }
>>          rc = xrecv (msg_, flags_);
>>          ticks = 0;
>> +        block = true;
>>      }
>>      rcvmore = msg_->flags & ZMQ_MSG_MORE;
>> --

More information about the zeromq-dev mailing list