[zeromq-dev] C Based ZeroMQ Aggregation Server Problems...

Pieter Hintjens ph at imatix.com
Mon Oct 17 22:25:39 CEST 2011


Hi Matthew,

A couple of comments and questions:

- why are you using 2 i/o threads? That is unnecessary
- what's your message rate?
- how many CPUs on the system?
- why create the second message via a temporary string, when you can
just send the message you received, as-is?

The code otherwise looks fine. So what you're seeing is that the code
blocks in the zmq_poll() call and never returns?

-Pieter

On Mon, Oct 17, 2011 at 3:12 PM, Matthew West <mwest at zynga.com> wrote:
> Hey gang,
>
>   So we’ve been working with ZeroMQ and had a lot of success with using it
> to provide a highly agile environment when it comes to moving data around
> where ever we may want it to go. Unfortunately we are seeing a problem as of
> lately with our Aggregation Server (Sink), which is in all actuality a Pull
> Socket which receives a message and then publishes it to another Pub Socket.
> Everything runs smoothly for a little while, sometimes even the better part
> of a day. Unfortunately however, it would seem like ZeroMQ gets blocked at
> some point with trying to shuttle the messages from the pull socket to the
> publish socket. According to netstat, the port is still accessible, but all
> of the information I would expect to see is actually building up in a memory
> cache being controlled by zeromq pull socket, this is expected considering
> we aren’t expunging the messages correctly. Essentially since the Pull
> socket can’t empty its messages, it just continues to receive them but never
> pushes them to the publish port and never performs a free on the msg buffer
> / message.
>
>   The following is our current C code server that we have dubbed the Sink.
> We are running zeromq v2.1.10, and we have also tried this out with 2.1.8
> and seen the same results. We have client daemons connected from
> approximately 700 nodes at this point, pushing data to this server. Any
> assistance with this matter would be great, we’ve been trying to understand
> why it seems like the socket just becomes unreachable at random times while
> trying to print information to the publish port. Running a strace –p <pid>
> -f on the process shows us doing recvfrom with msg data in the payload, but
> we are never seeing that data be printed back out the publish port.
>
> Thanks again,
>
> //
> //  Aggregation Server
> //
> #include "zhelpers.h"
> #include <stdio.h>
>
> void my_free (void *data,void *hint) {
>     free(data);
> }
>
> int main () {
>     s_version ();
>
>     void *context = zmq_init (2);
>
>     // Sink socket
>     void *sink = zmq_socket (context, ZMQ_PULL);
>     zmq_bind (sink, "tcp://*:5565");
>
>     // Pub socket
>     void *publisher = zmq_socket (context, ZMQ_PUB);
>     zmq_bind (publisher, "tcp://*:5566");
>
>     zmq_pollitem_t items[] = {
>         { sink,0,ZMQ_POLLIN,0 },
>     };
>
>     size_t poll_size = (sizeof(items)/sizeof(zmq_pollitem_t));
>
>     while (1) {
>         // Read message contents
>         zmq_poll (items,poll_size,-1);
>         if(items [0].revents & ZMQ_POLLIN) {
>             zmq_msg_t buf;
>             zmq_msg_init (&buf);
>             if(zmq_recv(sink,&buf,0)) {
>                 printf("WARNING recv failure\n");
>                 continue;
>             }
>             int size = zmq_msg_size(&buf);
>             char *string = malloc(size+1);
>             memcpy(string,zmq_msg_data(&buf),size);
>             string[size] = 0;
>             zmq_msg_close(&buf);
>
>             zmq_msg_t msg;
>             zmq_msg_init_data(&msg,string,size,my_free,NULL);
>             zmq_send(publisher,&msg,0);
>             zmq_msg_close(&msg);
>         }
>     }
>
>     // We never get here but clean up anyhow
>     printf("WARNING exiting\n");
>     zmq_close (publisher);
>     zmq_close (sink);
>     zmq_term (context);
>     return 0;
> }
>
> --
>
> Matt West
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>



More information about the zeromq-dev mailing list