[zeromq-dev] subscriber too slow

Jan Müller 217534 at gmail.com
Tue Nov 20 14:32:57 CET 2012


(My current ØMQ version is 3.2.2)

I have one thread which produces data (40MB/s) and publishes this over a PUB 
socket in 2kB/message. Multiple clients can subscribe (SUB sockets). With 
one of the SUB clients, I want to save the data to disk. However, I get lost 
packets in the client which tries to write to the harddisk. The other 
clients which don't try to save the data to disk receive all messages.

The disk *should* be fast enough to handle the data rate, I tested with:

time dd if=/dev/zero of=data.tmp bs=2k count=100000; time sync

1000000+0 records in
1000000+0 records out
2048000000 bytes (2.0 GB) copied, 11.7767 s, 174 MB/s

real    0m11.930s
user    0m0.235s
sys     0m7.886s

real    0m2.064s
user    0m0.001s
sys     0m0.011s

The architecture is very simple, below is the relevant code for the 
publisher and subscriber.

I suspect, there is a time when the client is "very" busy saving/syncing the 
data to disk. But averaged over time, the rate should be fine, I guess. I 
tried to experiment with different ZMQ_RCVHWM/ZMQ_RCVBUF values, but this 
didn't seem to have an effect. When setting the HWM to 0, shouldn't the 
memory footprint of the client grow in case the file writing takes 'too' 
long, to buffer the incoming messages? I monitored the memory consumption 
with "top" but didn't see any increase.

Also, I tried with inproc sockets in the same process as well as with tcp 
sockets running on two machines. Similar behavior.

20MB/s has "only a few" lost packets
40MB/s has more and
80MB/s has "considerable amount" of lost packets..

I didn't really measure the exact amount, as any lost packet is unacceptable 
for me atm.

I thought, if the file writing is very peaky, I simply need a larger buffer.
Is my RCV buffer big enough? Shouldn't it grow dynamically w/ HWM set to 0?

Should I chunk multiple messages together with multipart messages and by 
this reduce the number of times the "fwrite" function gets called?

Do you have a suggestion? Do I miss something?




zmq::context_t *zmq_context = new zmq::context_t(1);
zmq::socket_t socket (*zmq_context, ZMQ_SUB);
socket.connect ("tcp://");
int val = 20000 * 40; // OK???
socket.setsockopt(ZMQ_RCVHWM, &val, sizeof (val))
val = 20000 * 40 * 2048;
socket.setsockopt(ZMQ_RCVBUF, &val, sizeof (val))

for (;;) {
     zmq::message_t msg;
     fwrite( msg.data(), sizeof(unsigned char), msg.size(), of );
     static unsigned long long last_frame_no = 0;
     unsigned long long frame_no = 0;
     memcpy( (unsigned long long *) &frame_no, \
         (unsigned long long *) msg.data(), \
         sizeof(unsigned long long));

     /* Sanity Check for the FrameNo */
     if ( frame_no > last_frame_no+1 )
         fprintf(stderr,"frames missing %u\n", frame_no - last_frame_no);

     last_frame_no = frame_no;


zmq::context_t *zmq_context = new zmq::context_t(1);
zmq::socket_t workers (*zmq_context, ZMQ_PUB);
//workers.bind ("inproc://workers");
workers.bind ("tcp://*:3333");
int hwm = 0; // OK???
workers.setsockopt(ZMQ_SNDHWM, &hwm, sizeof (hwm));

for(;;) {
     unsigned long long frameno = 0;

     zmq::message_t message3(sizeof(unsigned long long) + \
         1024 * sizeof(unsigned short));
     memcpy( (unsigned long long *) message3.data(), \
         &frameno, sizeof(unsigned long long) );
     memcpy( (unsigned short *) message3.data() \
         + sizeof(unsigned long long), ordered_buffer,
         1024 * sizeof(unsigned short) );

More information about the zeromq-dev mailing list