[zeromq-dev] C Based ZeroMQ Aggregation Server Problems...
Henry Geddes
hgeddes at zynga.com
Mon Oct 17 23:12:13 CEST 2011
We have changed the i/o threads back to 1.
The system has 16 cpus.
Message throughput is roughly 10/15 MB/sec. No definite rate at this point.
Is there a good example anywhere of using zero copy from one socket to another. I don't think I am fully understanding the zero copy part. I have looked over the examples but they don't handle passing from 1 socket to another.
We have tested it using s_recv and s_send without the zero copy and see the same result so I don't think the zero copy is the cause. I also removed the poll and we still continued to see the issue so that also does not appear to be the cause. It looks as if the pull just starts blocking and we buffer the messages coming in until the box runs out of ram.
I have noticed this error "E: unhandled error on recv: 11/Resource temporarily unavailable" but it does not seem to correlate with the issue.
Henry Geddes
-----Original Message-----
From: zeromq-dev-bounces at lists.zeromq.org [mailto:zeromq-dev-bounces at lists.zeromq.org] On Behalf Of Pieter Hintjens
Sent: Monday, October 17, 2011 1:26 PM
To: ZeroMQ development list
Subject: Re: [zeromq-dev] C Based ZeroMQ Aggregation Server Problems...
Hi Matthew,
A couple of comments and questions:
- why are you using 2 i/o threads? That is unnecessary
- what's your message rate?
- how many CPUs on the system?
- why create the second message via a temporary string, when you can
just send the message you received, as-is?
The code otherwise looks fine. So what you're seeing is that the code
blocks in the zmq_poll() call and never returns?
-Pieter
On Mon, Oct 17, 2011 at 3:12 PM, Matthew West <mwest at zynga.com> wrote:
> Hey gang,
>
> So we've been working with ZeroMQ and had a lot of success with using it
> to provide a highly agile environment when it comes to moving data around
> where ever we may want it to go. Unfortunately we are seeing a problem as of
> lately with our Aggregation Server (Sink), which is in all actuality a Pull
> Socket which receives a message and then publishes it to another Pub Socket.
> Everything runs smoothly for a little while, sometimes even the better part
> of a day. Unfortunately however, it would seem like ZeroMQ gets blocked at
> some point with trying to shuttle the messages from the pull socket to the
> publish socket. According to netstat, the port is still accessible, but all
> of the information I would expect to see is actually building up in a memory
> cache being controlled by zeromq pull socket, this is expected considering
> we aren't expunging the messages correctly. Essentially since the Pull
> socket can't empty its messages, it just continues to receive them but never
> pushes them to the publish port and never performs a free on the msg buffer
> / message.
>
> The following is our current C code server that we have dubbed the Sink.
> We are running zeromq v2.1.10, and we have also tried this out with 2.1.8
> and seen the same results. We have client daemons connected from
> approximately 700 nodes at this point, pushing data to this server. Any
> assistance with this matter would be great, we've been trying to understand
> why it seems like the socket just becomes unreachable at random times while
> trying to print information to the publish port. Running a strace -p <pid>
> -f on the process shows us doing recvfrom with msg data in the payload, but
> we are never seeing that data be printed back out the publish port.
>
> Thanks again,
>
> //
> // Aggregation Server
> //
> #include "zhelpers.h"
> #include <stdio.h>
>
> void my_free (void *data,void *hint) {
> free(data);
> }
>
> int main () {
> s_version ();
>
> void *context = zmq_init (2);
>
> // Sink socket
> void *sink = zmq_socket (context, ZMQ_PULL);
> zmq_bind (sink, "tcp://*:5565");
>
> // Pub socket
> void *publisher = zmq_socket (context, ZMQ_PUB);
> zmq_bind (publisher, "tcp://*:5566");
>
> zmq_pollitem_t items[] = {
> { sink,0,ZMQ_POLLIN,0 },
> };
>
> size_t poll_size = (sizeof(items)/sizeof(zmq_pollitem_t));
>
> while (1) {
> // Read message contents
> zmq_poll (items,poll_size,-1);
> if(items [0].revents & ZMQ_POLLIN) {
> zmq_msg_t buf;
> zmq_msg_init (&buf);
> if(zmq_recv(sink,&buf,0)) {
> printf("WARNING recv failure\n");
> continue;
> }
> int size = zmq_msg_size(&buf);
> char *string = malloc(size+1);
> memcpy(string,zmq_msg_data(&buf),size);
> string[size] = 0;
> zmq_msg_close(&buf);
>
> zmq_msg_t msg;
> zmq_msg_init_data(&msg,string,size,my_free,NULL);
> zmq_send(publisher,&msg,0);
> zmq_msg_close(&msg);
> }
> }
>
> // We never get here but clean up anyhow
> printf("WARNING exiting\n");
> zmq_close (publisher);
> zmq_close (sink);
> zmq_term (context);
> return 0;
> }
>
> --
>
> Matt West
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
_______________________________________________
zeromq-dev mailing list
zeromq-dev at lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev
More information about the zeromq-dev
mailing list