[zeromq-dev] [PATCH] socket_base_t::recv() hangs intermittently when in blocking mode under certain conditions
Martin Sustrik
sustrik at 250bpm.com
Wed Nov 10 10:36:01 CET 2010
Hi Mark,
Thanks for the patch. I am currently out of the town. I'll be back on
Friday. I'll review and apply your patch then.
Martin
On 10/11/2010, "Marc Rossi" <mrossi19 at gmail.com> wrote:
>Here is code that can recreate the problem. The first pastebin link shows
>changes that need to be made to the socket_base_t::recv() method. Didn't
>post the entire func but it should be obvious where it fits in. These
>changes need to be applied to library ilnked in the test apps to recreate
>the timing necessary for the bug to surface.
>
>http://pastebin.com/RA2cJrt6
>
>The next two pastebin links are for pub.cpp and sub.cpp, respectively.
> Build and link against libzmq with the modified socket_base_t::recv()
>method.
>
>pub.cpp: http://pastebin.com/SqqaBnYc
>sub.cpp: http://pastebin.com/6s0xwtc8
>
>Follow the below steps:
>
>1. Start the pub process in a terminal -- it will create a PUB socket and
>wait for user input before publshing data.
>2. Start the sub process in a different terminal -- it will create a SUB
>socket and connect to the PUB socket from step 1. It will then wait for
>user input before attempting to read from the socket.
>3. Hit enter in the pub terminal. This will publish exactly 199 messages
>and then wait for user input before publishing more messages.
>4. Hit enter in the sub terminal This will receive exactly 199 messages
>before the modified socket_base.cpp code pauses and waits for user input.
> At this point the sub socket has no messages on the queue and no signals to
>be read but is about to call process_commands().
>5. Hit enter in the pub terminal to allow it to continue publishing
>messages. The sub client now has both a message in the pipe and a revive
>signal waiting for it when it awakens.
>6. Hit enter in the sub terminal to allow it to process the newly published
>messages. Unfortunately it will first process the revive signal then call
>process_commands() further down with block = true. Since there are no more
>signals to be read it will block even though there are messages continuing
>to be added to the queue.
>
>Apply the patch previously provided and try the same test, all should work
>normally.
>
>
>On Tue, Nov 9, 2010 at 2:15 PM, Marc Rossi <mrossi19 at gmail.com> wrote:
>
>> Main thread calls recv() and hangs forever (after working fine for a period
>> of time), memory usage grows continuously while io thread pulls data from
>> socket and pushes on the internal queue. netstat -a shows no data in recv-q
>> because io thread continues to work properly and pull data from the socket.
>>
>> This occurs under the following scenario:
>>
>> User code calls socket_base_t::recv() indirectly through higher level
>> zeromq API call when there are no messages waiting. Previous 99
>> (inbound_poll_rate - 1) calls to the recv() function returned an already
>> waiting message fetched by the xrecv() call at the start of the function().
>>
>> This 100th call to recv() is as stated above has no messages waiting to be
>> read so the xrecv() call fails and rc = -1. Immediately after this call to
>> xrecv() but BEFORE the conditional statement "if (++ticks ==
>> inbound_poll_rate)" a message arrives and is processed by the io thread,
>> resulting in the generation of a revive signal as the new message is pushed
>> onto the queue. Since ++ticks is now 100 (inbound_poll_rate) the above
>> conditional is true and app_thread_t::process_commands() is called,
>> processing the revive signal.
>>
>> Since this is a BLOCKING socket and rc != 0 we fall down to the loop at the
>> end of the recv() function that unfortunately for us calls the
>> app_thread_t::process_commands() method with block_ = true before calling
>> xrecv(). Since we already read the revive signal above we are now officially
>> hung as there is still a message in the queue and there will be no more
>> revive signals generated by the io thread because of that.
>>
>> To test that this is indeed what is happening I did the following. Added an
>> integer reference as a third parameter to the
>> app_thread_t::process_commands() method that is set to the number of
>> commands received and processed. Immediately before AND after calling
>> process_commands() method in the final loop of socket_base_t::recv() I added
>> a deug print statement that is executed ONLY if the prior call to
>> process_commands() returned a value > 0 for the third param. After running
>> the test code for about an hour the scenario described above occurred with
>> the debug print prior to the process_commands() call being displayed and
>> then the process was hung.
>>
>> Below is the simple patch that seems to fix the problem for me. This will
>> incur a small penalty when ticks == 0 and there are no messages waiting to
>> be read as the initial call to process_commands will return immediately due
>> to block being set to false. This could be made more efficient if the
>> process_commands() method took a 3rd param as a bool that was set to true if
>> commands were actually processed, then we would ONLY set block = false when
>> the previous call to process_commands() actually did something, not rely on
>> the ticks = 0 line in the if/then block.
>>
>>
>>
>>
>>
>> From 8d45a82d9cf7b788a3bed5014420962ea4ca5969 Mon Sep 17 00:00:00 2001
>> From: Marc Rossi <mrossi19 at gmail.com>
>> Date: Tue, 9 Nov 2010 13:46:06 -0600
>> Subject: [PATCH] Fix socket_t::recv() hang scenario where initial call to
>> process_commands() eats signal
>>
>> Added block boolean var to second process_commands() invocation for
>> blocking sockets
>> instead of always using true. This prevents the process_commands() call
>> from hanging
>> when a message is received with an empty queue after the call to xrecv()
>> but
>> prior to the initial call to process_commands() invoked when ++ticks ==
>> inbound_poll_rate.
>>
>> Signed-off-by: Marc Rossi <mrossi19 at gmail.com>
>> ---
>> src/socket_base.cpp | 4 +++-
>> 1 files changed, 3 insertions(+), 1 deletions(-)
>>
>> diff --git a/src/socket_base.cpp b/src/socket_base.cpp
>> index c933954..344b552 100644
>> --- a/src/socket_base.cpp
>> +++ b/src/socket_base.cpp
>> @@ -437,15 +437,17 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int
>> flags_)
>>
>> // In blocking scenario, commands are processed over and over again
>> until
>> // we are able to fetch a message.
>> + bool block = (ticks != 0);
>> while (rc != 0) {
>> if (errno != EAGAIN)
>> return -1;
>> - if (unlikely (!app_thread->process_commands (true, false))) {
>> + if (unlikely (!app_thread->process_commands (block, false))) {
>> errno = ETERM;
>> return -1;
>> }
>> rc = xrecv (msg_, flags_);
>> ticks = 0;
>> + block = true;
>> }
>>
>> rcvmore = msg_->flags & ZMQ_MSG_MORE;
>> --
>> 1.7.2.3
>>
>>
More information about the zeromq-dev
mailing list