[zeromq-dev] C Based ZeroMQ Aggregation Server Problems...
Pieter Hintjens
ph at imatix.com
Mon Oct 17 22:25:39 CEST 2011
Hi Matthew,
A couple of comments and questions:
- why are you using 2 i/o threads? That is unnecessary
- what's your message rate?
- how many CPUs on the system?
- why create the second message via a temporary string, when you can
just send the message you received, as-is?
The code otherwise looks fine. So what you're seeing is that the code
blocks in the zmq_poll() call and never returns?
-Pieter
On Mon, Oct 17, 2011 at 3:12 PM, Matthew West <mwest at zynga.com> wrote:
> Hey gang,
>
> So we’ve been working with ZeroMQ and had a lot of success with using it
> to provide a highly agile environment when it comes to moving data around
> where ever we may want it to go. Unfortunately we are seeing a problem as of
> lately with our Aggregation Server (Sink), which is in all actuality a Pull
> Socket which receives a message and then publishes it to another Pub Socket.
> Everything runs smoothly for a little while, sometimes even the better part
> of a day. Unfortunately however, it would seem like ZeroMQ gets blocked at
> some point with trying to shuttle the messages from the pull socket to the
> publish socket. According to netstat, the port is still accessible, but all
> of the information I would expect to see is actually building up in a memory
> cache being controlled by zeromq pull socket, this is expected considering
> we aren’t expunging the messages correctly. Essentially since the Pull
> socket can’t empty its messages, it just continues to receive them but never
> pushes them to the publish port and never performs a free on the msg buffer
> / message.
>
> The following is our current C code server that we have dubbed the Sink.
> We are running zeromq v2.1.10, and we have also tried this out with 2.1.8
> and seen the same results. We have client daemons connected from
> approximately 700 nodes at this point, pushing data to this server. Any
> assistance with this matter would be great, we’ve been trying to understand
> why it seems like the socket just becomes unreachable at random times while
> trying to print information to the publish port. Running a strace –p <pid>
> -f on the process shows us doing recvfrom with msg data in the payload, but
> we are never seeing that data be printed back out the publish port.
>
> Thanks again,
>
> //
> // Aggregation Server
> //
> #include "zhelpers.h"
> #include <stdio.h>
>
> void my_free (void *data,void *hint) {
> free(data);
> }
>
> int main () {
> s_version ();
>
> void *context = zmq_init (2);
>
> // Sink socket
> void *sink = zmq_socket (context, ZMQ_PULL);
> zmq_bind (sink, "tcp://*:5565");
>
> // Pub socket
> void *publisher = zmq_socket (context, ZMQ_PUB);
> zmq_bind (publisher, "tcp://*:5566");
>
> zmq_pollitem_t items[] = {
> { sink,0,ZMQ_POLLIN,0 },
> };
>
> size_t poll_size = (sizeof(items)/sizeof(zmq_pollitem_t));
>
> while (1) {
> // Read message contents
> zmq_poll (items,poll_size,-1);
> if(items [0].revents & ZMQ_POLLIN) {
> zmq_msg_t buf;
> zmq_msg_init (&buf);
> if(zmq_recv(sink,&buf,0)) {
> printf("WARNING recv failure\n");
> continue;
> }
> int size = zmq_msg_size(&buf);
> char *string = malloc(size+1);
> memcpy(string,zmq_msg_data(&buf),size);
> string[size] = 0;
> zmq_msg_close(&buf);
>
> zmq_msg_t msg;
> zmq_msg_init_data(&msg,string,size,my_free,NULL);
> zmq_send(publisher,&msg,0);
> zmq_msg_close(&msg);
> }
> }
>
> // We never get here but clean up anyhow
> printf("WARNING exiting\n");
> zmq_close (publisher);
> zmq_close (sink);
> zmq_term (context);
> return 0;
> }
>
> --
>
> Matt West
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
More information about the zeromq-dev
mailing list