[zeromq-dev] Effect of RCVHWM on pull socket at context termination?

Pieter Hintjens ph at imatix.com
Mon Mar 10 16:40:36 CET 2014


Hi Jonathan,

Sorry for not replying. I somehow missed your emails.

When you close TCP sockets, you will lose any data that hasn't yet
been received. There's no way to force data to be received except by
some kind of handshake back from the receiving application.

I.e. no combination of HWM will guarantee delivery of messages when
you shut down. You must add your own handshaking on top. A workaround
is to wait for a certain time to allow delivery. However that's not
reliable.

You can't do handshaking directly using PUSH/PULL (as they are 1-way
sockets). You can however send an "end of job" message and have the
ventilator wait for a signal back from the sink that this message has
been received.

-Pieter



On Mon, Feb 17, 2014 at 10:06 PM, Jonathan Kamens
<jkamens at quantopian.com> wrote:
> I'm surprised that there has been no response to the question below, which I
> posted a week ago. Is there some other, more appropriate forum in which I
> should post it?
>
> It seems to me that the behavior I observed, which is easily demonstrated by
> the Python script I included in my message, reflects at least a significant
> documentation gap which should be filled in (there is no explanation in the
> documentation of the effect of RCVHWM on a PULL socket), and at most a
> zeromq bug which leads to data loss (contrary to what the documentation
> says, zmq_term() allows a context to be terminated when not all messages
> have been transmitted).
>
> Thank you,
>
> Jonathan Kamens
>
>
> On 02/10/2014 04:24 PM, Jonathan Kamens wrote:
>
> As far as I can tell from reading all of the documentation I can find, by
> default when you use a PUSH/PULL socket and don't mess with the LINGER
> socket option, when you terminate your 0MQ context at the PUSH end, it
> should hang until all the messages have been successfully delivered to the
> PULL end.
>
> What effect does setting RCVHWM, i.e., the receive high water mark, on the
> PULL end have on this? The documentation appears to be completely silent on
> this question. For example, the zmq_socket documentation says nothing about
> the effect of setting RCVHWM on a PULL socket.
>
> Counterintuitively, it appears that if you set RCVHWM too low on the PULL
> end, in fact the socket termination does not hang on the PUSH end, and
> messages are lost. The script I've appended to the end of this message
> demonstrates this behavior. When I run it, the last line of output it prints
> is "Received 7", and then it hangs, i.e., two messages are lost. When I
> comment out the "sock.setsockopt(zmq.RCVHWM, 1)" line or set the high water
> mark to 10 instead of 1, then all messages are successfully received.
>
> We need to be able to set the receive high water mark to a relatively low
> value at both the PUSH and PULL ends of the sockets our application uses,
> because we actually want the socket sends to block at the PUSH end if end
> PULL end falls too far behind in processing them, to prevent memory usage by
> 0MQ from ballooning in the processes at both ends of the socket.
>
> Is the behavior we are seeing expected, or a bug? If expected, that (a) is
> it documented anywhere (I'm trying to figure out what documentation I should
> have read but didn't) and (b) what pattern can/should we follow to achieve
> our intended goal of reliable delivery of the entire stream without memory
> usage by 0MQ ballooning at either end if the PULL end falls behind?
>
> I am using 0MQ 3.2.4, PyZMQ 14.0.1, and python 2.7.5 on Fedora 20 Linux
> (x86_64).
>
> Thanks,
>
> Jonathan Kamens
> ________________________________
> from multiprocessing import Process
> import time
> import zmq
>
> addr = 'ipc:///tmp/test-socket'
> num_messages = 10
> send_duration = 2
> receive_duration = 4
>
> def sender():
>     # Send 20 messages in 2 seconds
>     ctx = zmq.Context()
>     sock = ctx.socket(zmq.PUSH)
>     sock.bind(addr)
>     sleep_time = float(send_duration) / num_messages
>     for i in range(num_messages):
>         sock.send(str(i))
>         print 'Sent %d' % i
>         time.sleep(sleep_time)
>     sock.close()
>     ctx.term()
>     print 'Sender exiting'
>
> Process(target=sender).start()
>
> ctx = zmq.Context()
> sock = ctx.socket(zmq.PULL)
> sock.setsockopt(zmq.RCVHWM, 1)
> sock.connect(addr)
> sleep_time = float(receive_duration) / num_messages
> for i in range(num_messages):
>     msg = sock.recv()
>     print 'Received %s' % msg
>     time.sleep(sleep_time)
> sock.close()
> ctx.term()
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>



More information about the zeromq-dev mailing list