[zeromq-dev] syncpub/syncsub example losing messages (ZMQ 3.2.0-rc1, Solaris 11)
Alex Keahan
akeahan at gbtradingllc.com
Wed Jul 18 01:23:32 CEST 2012
ZMQ 3.2.0-rc1 was built with the latest Sun Studio C/C++ compilers,
with CFLAGS and CXXFLAGS=-fast -library=stlport4.
$ uname -a
SunOS xxxxxx 5.11 11.0 i86pc i386 i86pc
$ CC -V
CC: Sun C++ 5.12 SunOS_i386 2011/11/16
$ cc -V
cc: Sun C 5.12 SunOS_i386 2011/11/16
The syncpub/syncsub example from the guide was adapted to 3.2.0 (see
the code below).
The original version of syncsub would simply get stuck in zmq_recv().
I added extra code to detect sequence gaps and break out of the loop
whenever there's a discrepancy; now all syncsubs print "received N
updates" where N is somewhat random. Slowing down the producer fixes
the problem. Zmq reports no errors at any point.
As a side note, whenever there is a sequence gap, the next sequence
number received by syncsub appears to come from the start of the next
send() buffer sent by syncpub (confirmed by 'truss syncpub')
Any suggestions?
Alex Keahan
Run as follows:
$ ./syncpub &
$ ./syncsub A &
$ ./syncsub B &
You should see something similar to this:
$ ./syncpub &
[1] 6611
$ Waiting for subscribers
$ ./syncsub A &
[2] 6612
$ ./syncsub B &
[3] 6613
$ Broadcasting messages
Message mismatch: received 'Msg #1266' expected 'Msg #1001'
A: received 1000 updates
Message mismatch: received 'Msg #1266' expected 'Msg #1001'
B: received 1000 updates
[2]- Done ./syncsub A
[3]+ Done ./syncsub B
syncpub.c:
#include <stdio.h>
#include <strings.h>
#include <unistd.h>
#include <zmq.h>
// Wait for 2 subscribers
#define SUBSCRIBERS_EXPECTED 2
int
main(int argc, char *argv[])
{
void *context = zmq_ctx_new();
if (context == 0) {
fprintf(stderr, "Error: zmq_ctx_new failed: %s\n", zmq_strerror(errno));
return 1;
}
int rc = zmq_ctx_set(context, ZMQ_IO_THREADS, 1);
if (rc != 0) {
fprintf(stderr, "Error: zmq_ctx_set failed: %s\n", zmq_strerror(errno));
return 2;
}
// Socket to talk to clients
void *publisher = zmq_socket(context, ZMQ_PUB);
if (publisher == 0) {
fprintf(stderr, "Error: zmq_socket(ZMQ_PUB) failed: %s\n",
zmq_strerror(errno));
return 4;
}
rc = zmq_bind (publisher, "tcp://*:5561");
if (rc != 0) {
fprintf(stderr, "Error: zmq_bind failed: %s\n", zmq_strerror(errno));
return 5;
}
// Socket to receive signals
void *syncservice = zmq_socket (context, ZMQ_REP);
if (syncservice == 0) {
fprintf(stderr, "Error: zmq_socket(ZMQ_REP) failed: %s\n",
zmq_strerror(errno));
return 6;
}
rc = zmq_bind (syncservice, "tcp://*:5562");
if (rc != 0) {
fprintf(stderr, "Error: zmq_bind failed: %s\n", zmq_strerror(errno));
return 7;
}
// Get synchronization from subscribers
printf ("Waiting for subscribers\n");
int subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {
char tmp[24];
// - wait for synchronization request
int size = zmq_recv(syncservice, tmp, sizeof(tmp), 0);
if (size < 0) {
fprintf(stderr, "Error: zmq_recv failed: %s\n", zmq_strerror(errno));
return 8;
}
// - send synchronization reply
rc = zmq_send(syncservice, "", 0, 0);
if (rc < 0) {
fprintf(stderr, "Error: zmq_send failed: %s\n", zmq_strerror(errno));
return 9;
}
subscribers++;
}
// Now broadcast exactly 100,000 updates followed by END
printf ("Broadcasting messages\n");
int update_nbr;
for (update_nbr = 0; update_nbr < 100000; update_nbr++) {
char buf[24];
int len;
sprintf(buf, "Msg #%d", update_nbr+1);
len = strlen(buf);
int size = zmq_send(publisher, buf, len, 0);
if (size != len) {
if (errno == EAGAIN) {
update_nbr--;
continue;
}
fprintf(stderr, "Error: zmq_send failed: %s\n", zmq_strerror(errno));
return 10;
}
}
int size = zmq_send(publisher, "END", 3, 0);
if (size != 3) {
fprintf(stderr, "Error: zmq_send failed: %s\n", zmq_strerror(errno));
return 11;
}
sleep(10);
zmq_close(publisher);
zmq_close(syncservice);
zmq_term (context);
return 0;
}
syncsub.c:
#include <stdio.h>
#include <strings.h>
#include <unistd.h>
#include <zmq.h>
int
main(int argc, char *argv[])
{
const char *prefix = (argc > 1 ? argv[1] : "");
void *context = zmq_ctx_new();
if (context == 0) {
fprintf(stderr, "Error: zmq_ctx_new failed: %s\n", zmq_strerror(errno));
return 1;
}
int rc = zmq_ctx_set(context, ZMQ_IO_THREADS, 1);
if (rc != 0) {
fprintf(stderr, "Error: zmq_ctx_set failed: %s\n", zmq_strerror(errno));
return 2;
}
// First, connect our subscriber socket
void *subscriber = zmq_socket(context, ZMQ_SUB);
if (subscriber == 0) {
fprintf(stderr, "Error: zmq_socket(ZMQ_SUB) failed: %s\n",
zmq_strerror(errno));
return 4;
}
rc = zmq_connect(subscriber, "tcp://localhost:5561");
if (rc != 0) {
fprintf(stderr, "Error: zmq_connect failed: %s\n", zmq_strerror(errno));
return 5;
}
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
if (rc != 0) {
fprintf(stderr, "Error: zmq_setsockopt failed: %s\n", zmq_strerror(errno));
return 6;
}
// 0MQ is so fast, we need to wait a whileâ¦
sleep (1);
// Second, synchronize with publisher
void *syncclient = zmq_socket (context, ZMQ_REQ);
if (syncclient == 0) {
fprintf(stderr, "Error: zmq_socket(ZMQ_REQ) failed: %s\n",
zmq_strerror(errno));
return 7;
}
rc = zmq_connect (syncclient, "tcp://localhost:5562");
if (rc != 0) {
fprintf(stderr, "Error: zmq_connect failed: %s\n", zmq_strerror(errno));
return 8;
}
// - send a synchronization request
rc = zmq_send(syncclient, "", 0, 0);
if (rc < 0) {
fprintf(stderr, "Error: zmq_send failed: %s\n", zmq_strerror(errno));
return 9;
}
// - wait for synchronization reply
char buf[24];
int size = zmq_recv(syncclient, buf, sizeof(buf), 0);
if (size < 0) {
fprintf(stderr, "Error: zmq_recv failed: %s\n", zmq_strerror(errno));
return 10;
}
// Third, get our updates and report how many we got
int update_nbr = 0;
while (1) {
size = zmq_recv(subscriber, buf, sizeof(buf), 0);
if (size < 0) {
fprintf(stderr, "Error: zmq_recv failed: %s\n", zmq_strerror(errno));
return 11;
}
// Make sure the message numbers match
char refbuf[24];
sprintf(refbuf, "Msg #%d", update_nbr+1);
if (memcmp(buf, refbuf, strlen(refbuf)) != 0) {
printf("Message mismatch: received '%*.*s' expected '%s'\n",
size, size, buf, refbuf);
break;
}
if (memcmp(buf, "END", 3) == 0) {
break;
}
update_nbr++;
}
printf ("%s: received %d updates\n", prefix, update_nbr);
zmq_close (subscriber);
zmq_close (syncclient);
zmq_term (context);
return 0;
}
More information about the zeromq-dev
mailing list