[zeromq-dev] Some suggestions for ZeroMQ

Victor Lin bornstub at gmail.com
Sun Jan 22 08:47:57 CET 2012


Hi guys,

I'm developing a group of audio streaming system based on ZeroMQ, and I
encountered some problems. I have a group of servers that collect audio
data from internet. Let's say

    Audio server 1 (PUB connect)
    Audio server 2 (PUB connect)
    ....
    Audio server N (PUB connect)

And I open a PUB socket that connects to a domain socket, then there is a
server for processing those audio data, let's say

    Audio processor (SUB bind)

Obviously, audio processor should bind on a well known address, because
number of audio servers is unknown. So far so good, the design looks fine,
but I here comes the problem. If the audio processor is not there, the
connecting PUB socket will queue all outgoing data. This makes the memory
usage get higher and higher, then eventually makes my server crash.

I posted this problem at
http://stackoverflow.com/questions/8952091/zeromq-pub-socket-buffers-all-my-out-going-data-when-it-is-connecting

This feature surprises me, and I don't understand why. A socket in zeromq
can connect and bind to multiple address, I think all I have to do is to
design the topology, but I was wrong, connecting and binding sockets behave
differently. The reason PUB/SUB is used is that I don't care those messages
before the connection made. The reason I want to use bind or connect
because I need to. The difference of connect/bind socket, makes me have to
make the system more complex to achieve same job, let's see, bind one the
audio server, connect on the audio processor.

    Audio server 1 (PUB bind)
    Audio server 2 (PUB bind)
    ....
    Audio server N (PUB bind)

And for audio processor

    Audio processor (SUB connect)

As you can see, for now, we need N different addresses for each audio
server, and another problem is that how can the audio processor knows
addresses of audio servers? A name server should be added in this case just
because the connect/bind socket behave differently.

You may say what about another solution - HWM. Yes, to limit the memory
usage, of course HWM can be set, but when the pipe is full, new messages
are dropped. Let's say, you send

    msg 1
    msg 2
    msg 3
    ....
    msg N

If old messages are important, it's fine to drop new messages. Let the HWM
be 4, and eventually the connection is made

    msg 1
    msg 2
    msg 3
    msg 4
    (dropped)
    msg K
    msg K + 1
    ...

But in most of cases, new messages are far more important than old ones,
just like audio streaming, old messages are useless, new messages matter.

I have reviewed the source code of ZeroMQ, and I know connecting messages
are kept in pipe, the pipes are created and attached immediately in connect
method of socket_base.

        //  Create a bi-directional pipe to connect the peers.
        object_t *parents [2] = {this, peer.socket};
        pipe_t *pipes [2] = {NULL, NULL};
        int hwms [2] = {sndhwm, rcvhwm};
        bool delays [2] = {options.delay_on_disconnect,
options.delay_on_close};
        int rc = pipepair (parents, pipes, hwms, delays);
        errno_assert (rc == 0);

        //  Attach local end of the pipe to this socket object.
        attach_pipe (pipes [0]);

On the other hand, pipes are created and attached when connection is made
for a binding socket. This is why connecting socket buffers data, but
binding socket doesn't.

And for HWM, write of pipe checks the message number and HWM, drop new
messages if the queue is full

bool zmq::pipe_t::check_write (msg_t *msg_)
 {
if (unlikely (!out_active || state != active))
return false;

bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm);

if (unlikely (full)) {
 out_active = false;
return false;
}

return true;
}

bool zmq::pipe_t::write (msg_t *msg_)
 {
if (unlikely (!check_write (msg_)))
return false;

bool more = msg_->flags () & msg_t::more ? true : false;
outpipe->write (*msg_, more);
 if (!more)
msgs_written++;

return true;
 }

IMHO, there are some design should be changed.

1. Add an option to determine whether to buffer data on connecting sockets

I'm not sure the reason why connecting sockets buffer data, but I think
it's for to sync easily. To make the behavior not so surprising, it's
better to connect without buffering, namely, create the pipe and attached
when connection is made. You can also provide another function

    connect_with_buffer()

Like this, to connect with pipe attached.

2. Add an option to determine how to drop messages in HWM state

As what I said, new messages are important in most of cases. It's better to
have an option to determine to drop new message or old message.

I would like to fork the project in github and make some patches, but I'm
in a rush for my own project. Now, all I have to do is to adapt the system
to bind address on each client and connect from the server, ugly, but it
works. Will see is there spare time to contribute. Hope this could be
helpful.

Best regards,
Victor Lin.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20120122/a14c3c2c/attachment.htm>


More information about the zeromq-dev mailing list