[zeromq-dev] [PATCH] socket_base_t::recv() hangs intermittently when in blocking mode under certain conditions

Marc Rossi mrossi19 at gmail.com
Wed Nov 10 01:13:56 CET 2010


Here is code that can recreate the problem.   The first pastebin link shows
changes that need to be made to the socket_base_t::recv() method.  Didn't
post the entire func but it should be obvious where it fits in.  These
changes need to be applied to library ilnked in the test apps to recreate
the timing necessary for the bug to surface.

http://pastebin.com/RA2cJrt6

The next two pastebin links are for pub.cpp and sub.cpp, respectively.
 Build and link against libzmq with the modified socket_base_t::recv()
method.

pub.cpp:  http://pastebin.com/SqqaBnYc
sub.cpp:  http://pastebin.com/6s0xwtc8

Follow the below steps:

1. Start the pub process in a terminal -- it will create a PUB socket and
wait for user input before publshing data.
2. Start the sub process in a different terminal -- it will create a SUB
socket and connect to the PUB socket from step 1.  It will  then wait for
user input before attempting to read from the socket.
3. Hit enter in the pub terminal.  This will publish exactly 199 messages
and then wait for user input before publishing more messages.
4. Hit  enter in the  sub terminal  This will receive exactly 199 messages
before the modified socket_base.cpp code pauses and waits for user input.
 At this point the sub socket has no messages on the queue and no signals to
be read but is about to call process_commands().
5. Hit enter in the pub terminal to allow it to continue publishing
messages.   The sub client now has both a message in the pipe and a revive
signal waiting for it when it awakens.
6. Hit enter in the sub terminal to allow it to process the newly published
messages.  Unfortunately it will first process the revive signal then call
process_commands() further down with block = true.  Since there are no more
signals to be read it will block even though there are messages continuing
to be added to the queue.

Apply the patch previously provided and try the same test, all should work
normally.


On Tue, Nov 9, 2010 at 2:15 PM, Marc Rossi <mrossi19 at gmail.com> wrote:

> Main thread calls recv() and hangs forever (after working fine for a period
> of time), memory usage grows continuously while io thread pulls data from
> socket and pushes on the internal queue. netstat -a shows no data in recv-q
> because io thread continues to work properly and pull data from the socket.
>
> This occurs under the following scenario:
>
> User code calls socket_base_t::recv() indirectly through higher level
> zeromq API call when there are no messages waiting. Previous 99
> (inbound_poll_rate - 1) calls to the recv() function returned an already
> waiting message fetched by the xrecv() call at the start of the function().
>
> This 100th call to recv() is as stated above has no messages waiting to be
> read so the xrecv() call fails and rc = -1. Immediately after this call to
> xrecv() but BEFORE the conditional statement "if (++ticks ==
> inbound_poll_rate)" a message arrives and is processed by the io thread,
> resulting in the generation of a revive signal as the new message is pushed
> onto the queue. Since ++ticks is now 100 (inbound_poll_rate) the above
> conditional is true and app_thread_t::process_commands() is called,
> processing the revive signal.
>
> Since this is a BLOCKING socket and rc != 0 we fall down to the loop at the
> end of the recv() function that unfortunately for us calls the
> app_thread_t::process_commands() method with block_ = true before calling
> xrecv(). Since we already read the revive signal above we are now officially
> hung as there is still a message in the queue and there will be no more
> revive signals generated by the io thread because of that.
>
> To test that this is indeed what is happening I did the following. Added an
> integer reference as a third parameter to the
> app_thread_t::process_commands() method that is set to the number of
> commands received and processed. Immediately before AND after calling
> process_commands() method in the final loop of socket_base_t::recv() I added
> a deug print statement that is executed ONLY if the prior call to
> process_commands() returned a value > 0 for the third param. After running
> the test code for about an hour the scenario described above occurred with
> the debug print prior to the process_commands() call being displayed and
> then the process was hung.
>
> Below is the simple patch that seems to fix the problem for me. This will
> incur a small penalty when ticks == 0 and there are no messages waiting to
> be read as the initial call to process_commands will return immediately due
> to block being set to false. This could be made more efficient if the
> process_commands() method took a 3rd param as a bool that was set to true if
> commands were actually processed, then we would ONLY set block = false when
> the previous call to process_commands() actually did something, not rely on
> the ticks = 0 line in the if/then block.
>
>
>
>
>
> From 8d45a82d9cf7b788a3bed5014420962ea4ca5969 Mon Sep 17 00:00:00 2001
> From: Marc Rossi <mrossi19 at gmail.com>
> Date: Tue, 9 Nov 2010 13:46:06 -0600
> Subject: [PATCH] Fix socket_t::recv() hang scenario where initial call to
> process_commands() eats signal
>
> Added block boolean var to second process_commands() invocation for
> blocking sockets
> instead of always using true.  This prevents the process_commands() call
> from hanging
> when a message is received with an empty queue after the call to xrecv()
> but
> prior to the initial call to process_commands() invoked when ++ticks ==
> inbound_poll_rate.
>
> Signed-off-by: Marc Rossi <mrossi19 at gmail.com>
> ---
>  src/socket_base.cpp |    4 +++-
>  1 files changed, 3 insertions(+), 1 deletions(-)
>
> diff --git a/src/socket_base.cpp b/src/socket_base.cpp
> index c933954..344b552 100644
> --- a/src/socket_base.cpp
> +++ b/src/socket_base.cpp
> @@ -437,15 +437,17 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int
> flags_)
>
>      //  In blocking scenario, commands are processed over and over again
> until
>      //  we are able to fetch a message.
> +    bool block = (ticks != 0);
>      while (rc != 0) {
>          if (errno != EAGAIN)
>              return -1;
> -        if (unlikely (!app_thread->process_commands (true, false))) {
> +        if (unlikely (!app_thread->process_commands (block, false))) {
>              errno = ETERM;
>              return -1;
>          }
>          rc = xrecv (msg_, flags_);
>          ticks = 0;
> +        block = true;
>      }
>
>      rcvmore = msg_->flags & ZMQ_MSG_MORE;
> --
> 1.7.2.3
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20101109/4150387b/attachment.htm>


More information about the zeromq-dev mailing list