[zeromq-dev] C Based ZeroMQ Aggregation Server Problems...

Matthew West mwest at zynga.com
Mon Oct 17 22:12:46 CEST 2011


Hey gang,

  So we’ve been working with ZeroMQ and had a lot of success with using it to provide a highly agile environment when it comes to moving data around where ever we may want it to go. Unfortunately we are seeing a problem as of lately with our Aggregation Server (Sink), which is in all actuality a Pull Socket which receives a message and then publishes it to another Pub Socket. Everything runs smoothly for a little while, sometimes even the better part of a day. Unfortunately however, it would seem like ZeroMQ gets blocked at some point with trying to shuttle the messages from the pull socket to the publish socket. According to netstat, the port is still accessible, but all of the information I would expect to see is actually building up in a memory cache being controlled by zeromq pull socket, this is expected considering we aren’t expunging the messages correctly. Essentially since the Pull socket can’t empty its messages, it just continues to receive them but never pushes them to the publish port and never performs a free on the msg buffer / message.

  The following is our current C code server that we have dubbed the Sink. We are running zeromq v2.1.10, and we have also tried this out with 2.1.8 and seen the same results. We have client daemons connected from approximately 700 nodes at this point, pushing data to this server. Any assistance with this matter would be great, we’ve been trying to understand why it seems like the socket just becomes unreachable at random times while trying to print information to the publish port. Running a strace –p <pid> -f on the process shows us doing recvfrom with msg data in the payload, but we are never seeing that data be printed back out the publish port.

Thanks again,

//
//  Aggregation Server
//
#include "zhelpers.h"
#include <stdio.h>

void my_free (void *data,void *hint) {
    free(data);
}

int main () {
    s_version ();

    void *context = zmq_init (2);

    // Sink socket
    void *sink = zmq_socket (context, ZMQ_PULL);
    zmq_bind (sink, "tcp://*:5565");

    // Pub socket
    void *publisher = zmq_socket (context, ZMQ_PUB);
    zmq_bind (publisher, "tcp://*:5566");

    zmq_pollitem_t items[] = {
        { sink,0,ZMQ_POLLIN,0 },
    };

    size_t poll_size = (sizeof(items)/sizeof(zmq_pollitem_t));

    while (1) {
        // Read message contents
        zmq_poll (items,poll_size,-1);
        if(items [0].revents & ZMQ_POLLIN) {
            zmq_msg_t buf;
            zmq_msg_init (&buf);
            if(zmq_recv(sink,&buf,0)) {
                printf("WARNING recv failure\n");
                continue;
            }
            int size = zmq_msg_size(&buf);
            char *string = malloc(size+1);
            memcpy(string,zmq_msg_data(&buf),size);
            string[size] = 0;
            zmq_msg_close(&buf);

            zmq_msg_t msg;
            zmq_msg_init_data(&msg,string,size,my_free,NULL);
            zmq_send(publisher,&msg,0);
            zmq_msg_close(&msg);
        }
    }

    // We never get here but clean up anyhow
    printf("WARNING exiting\n");
    zmq_close (publisher);
    zmq_close (sink);
    zmq_term (context);
    return 0;
}

--

Matt West
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20111017/1777dee5/attachment.htm>


More information about the zeromq-dev mailing list