[zeromq-dev] C Based ZeroMQ Aggregation Server Problems...

Amr Ali amr.ali.cc at gmail.com
Mon Oct 17 22:42:21 CEST 2011


Hi Pieter,

This is probably unrelated, but on a high traffic setup like the one being
discussed, wouldn't one want 2 I/O threads to handle the traffic coming in
without blocking/waiting for a busy thread to handle sending out messages on the
PUB socket?

I understand that without setting the affinity on the sockets, both threads will
be doing the receiving and the sending, but wouldn't that affect concurrency and
eventually cause congestion on that single thread if there are a lot of messages
to receive?

On 10/17/2011 10:25 PM, Pieter Hintjens wrote:
> Hi Matthew,
> 
> A couple of comments and questions:
> 
> - why are you using 2 i/o threads? That is unnecessary
> - what's your message rate?
> - how many CPUs on the system?
> - why create the second message via a temporary string, when you can
> just send the message you received, as-is?
> 
> The code otherwise looks fine. So what you're seeing is that the code
> blocks in the zmq_poll() call and never returns?
> 
> -Pieter
> 
> On Mon, Oct 17, 2011 at 3:12 PM, Matthew West <mwest at zynga.com> wrote:
>> Hey gang,
>>
>>   So we’ve been working with ZeroMQ and had a lot of success with using it
>> to provide a highly agile environment when it comes to moving data around
>> where ever we may want it to go. Unfortunately we are seeing a problem as of
>> lately with our Aggregation Server (Sink), which is in all actuality a Pull
>> Socket which receives a message and then publishes it to another Pub Socket.
>> Everything runs smoothly for a little while, sometimes even the better part
>> of a day. Unfortunately however, it would seem like ZeroMQ gets blocked at
>> some point with trying to shuttle the messages from the pull socket to the
>> publish socket. According to netstat, the port is still accessible, but all
>> of the information I would expect to see is actually building up in a memory
>> cache being controlled by zeromq pull socket, this is expected considering
>> we aren’t expunging the messages correctly. Essentially since the Pull
>> socket can’t empty its messages, it just continues to receive them but never
>> pushes them to the publish port and never performs a free on the msg buffer
>> / message.
>>
>>   The following is our current C code server that we have dubbed the Sink.
>> We are running zeromq v2.1.10, and we have also tried this out with 2.1.8
>> and seen the same results. We have client daemons connected from
>> approximately 700 nodes at this point, pushing data to this server. Any
>> assistance with this matter would be great, we’ve been trying to understand
>> why it seems like the socket just becomes unreachable at random times while
>> trying to print information to the publish port. Running a strace –p <pid>
>> -f on the process shows us doing recvfrom with msg data in the payload, but
>> we are never seeing that data be printed back out the publish port.
>>
>> Thanks again,
>>
>> //
>> //  Aggregation Server
>> //
>> #include "zhelpers.h"
>> #include <stdio.h>
>>
>> void my_free (void *data,void *hint) {
>>     free(data);
>> }
>>
>> int main () {
>>     s_version ();
>>
>>     void *context = zmq_init (2);
>>
>>     // Sink socket
>>     void *sink = zmq_socket (context, ZMQ_PULL);
>>     zmq_bind (sink, "tcp://*:5565");
>>
>>     // Pub socket
>>     void *publisher = zmq_socket (context, ZMQ_PUB);
>>     zmq_bind (publisher, "tcp://*:5566");
>>
>>     zmq_pollitem_t items[] = {
>>         { sink,0,ZMQ_POLLIN,0 },
>>     };
>>
>>     size_t poll_size = (sizeof(items)/sizeof(zmq_pollitem_t));
>>
>>     while (1) {
>>         // Read message contents
>>         zmq_poll (items,poll_size,-1);
>>         if(items [0].revents & ZMQ_POLLIN) {
>>             zmq_msg_t buf;
>>             zmq_msg_init (&buf);
>>             if(zmq_recv(sink,&buf,0)) {
>>                 printf("WARNING recv failure\n");
>>                 continue;
>>             }
>>             int size = zmq_msg_size(&buf);
>>             char *string = malloc(size+1);
>>             memcpy(string,zmq_msg_data(&buf),size);
>>             string[size] = 0;
>>             zmq_msg_close(&buf);
>>
>>             zmq_msg_t msg;
>>             zmq_msg_init_data(&msg,string,size,my_free,NULL);
>>             zmq_send(publisher,&msg,0);
>>             zmq_msg_close(&msg);
>>         }
>>     }
>>
>>     // We never get here but clean up anyhow
>>     printf("WARNING exiting\n");
>>     zmq_close (publisher);
>>     zmq_close (sink);
>>     zmq_term (context);
>>     return 0;
>> }
>>
>> --
>>
>> Matt West
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev

-- 
Amr Ali

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 198 bytes
Desc: OpenPGP digital signature
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20111017/15f6a0e6/attachment.sig>


More information about the zeromq-dev mailing list