[zeromq-dev] 0MQ v2 : Parallelized Pipeline UPSTREAM/DOWNSTREAM Sockets Over Direct Tcp

Tom Marthaler tmarthal at neuraliq.com
Fri Apr 9 04:09:26 CEST 2010


I was trying to use zeromq to create a guaranteed delivery to a downstream
socket. I have a couple questions:

1) Guaranteed delivery does not seem to work without the zmq_streamer
appliance storing the messages.

Using the zmq_streamer device, if I have a process called pipeline-start
sending messages via a downstream socket to the <in> streamer element and a
pipeline-end process receiving messages from an upstream socket, then the
delivery works as expected: start sends messages 1,2,3,4,5 (pipeline-end is
started; receives messages 1-5), sends 6 (receive 6), send 7 (recieve 7),
kill pipeline-end, send 8, send 9, send 10 (restart pipeline-end, receive
messages 8-10), send 11 (receive 11), etc. This is the expected behavior for
the guaranteed delivery.  [Note that when the pipeline-start process sends
messages to the zmq_streamer, it uses socket.connect()]


However, on the pipeline-start process, when I bind the downstram socket to
an interface, and send messages using the downstream socket directly to a
process with a connected upstream socket, the guaranteed delivery does not
work. In this case, pipeline-start is does the bind() on the socket, while
pipeline-end still does the connect().

pipeline-start:
    //  Initialise 0MQ infrastructure.
    zmq::context_t ctx (1,1,0);

    // the downstream socket for sending stuff to the next node
    zmq::socket_t s(ctx, ZMQ_DOWNSTREAM);
    s.bind("tcp://eth0:5556"); //

    char textbuf [1024];

    int m_count = 0;
    while (true) {
        snprintf (textbuf, sizeof (textbuf), "%d: ", m_count); // the
message is just the counter

        zmq::message_t msg (strlen (textbuf) + 1);
        memcpy (msg.data (), textbuf, msg.size ());

        //  Send the message
        cout << "trying to send " << textbuf << endl;
        int rc = s.send(msg, ZMQ_NOBLOCK);
        if (rc > 0) {
            cout << "successfully sent message " << rc << endl;
            m_count++;
        }
        else if (rc == 0) {
            cout << "successfully queued message " << endl;
        }
        sleep(1);
    }

pipeline-end:
   //  Initialise 0MQ infrastructure.
    zmq::context_t ctx (1, 1);

    // the upstream socket for receiving stuff from the previous node
    zmq::socket_t s(ctx,    ZMQ_UPSTREAM);

    s.connect("tcp://192.168.1.1:5556);


    char textbuf [1024];

    while (true) {
        // Receive and display the result
        zmq::message_t resultset;
        s.recv (&resultset);
        const char *resultset_string = (const char *)resultset.data ();
        cout <<  "Received response: " << resultset_string << endl;
    }


So, when the send() method is called on the pipeline-start socket, if the
pipeline-end process is available, then the message is delivered and the
socket.send() method returns a value of 1. If the pipeline-end process has
not started the socket.send() returns 0 ; according to the zmq_send man
page, 0 means successful queueing of the message onto the zeromq queue.
However, when pipeline-end comes back online, the queued messages are not
sent. Are the messages supposed to be sent?

==



2) I cannot get guaranteed message delivery to multiple downstream sockets.
Using the zmq_streamer case above, with two pipeline-end processes
connecting their upstream sockets to the same downstream socket ip, then
they seem to alternate message delivery. In other words: process end1 and
end2 start, start pipeline-start sends message 1 (end1 receives msg1), send
messag 2 (end2 receives msg2), send message 3 (end1 receives msg3), send
message 4 (end receives msg4), ...

And if both downstream sockets are disconnected, then the first one to
connect receives the undelivered queue of messages, rather than both of
them. So the upstream socket is keeping state of which messages are
delivered, but not to each connection?
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20100408/a601c15a/attachment.html>


More information about the zeromq-dev mailing list