[zeromq-dev] syncpub/syncsub example losing messages (ZMQ 3.2.0-rc1, Solaris 11)

Alex Keahan akeahan at gbtradingllc.com
Wed Jul 18 01:23:32 CEST 2012


ZMQ 3.2.0-rc1 was built with the latest Sun Studio C/C++ compilers,
with CFLAGS and CXXFLAGS=-fast -library=stlport4.

$ uname -a
SunOS xxxxxx 5.11 11.0 i86pc i386 i86pc
$ CC -V
CC: Sun C++ 5.12 SunOS_i386 2011/11/16
$ cc -V
cc: Sun C 5.12 SunOS_i386 2011/11/16


The syncpub/syncsub example from the guide was adapted to 3.2.0 (see
the code below).

The original version of syncsub would simply get stuck in zmq_recv().
 I added extra code to detect sequence gaps and break out of the loop
whenever there's a discrepancy; now all syncsubs print "received N
updates" where N is somewhat random.   Slowing down the producer fixes
the problem.   Zmq reports no errors at any point.

As a side note, whenever there is a sequence gap, the next sequence
number received by syncsub appears to come from the start of the next
send() buffer sent by syncpub (confirmed by 'truss syncpub')

Any suggestions?

Alex Keahan


Run as follows:

$ ./syncpub &
$ ./syncsub A &
$ ./syncsub B &

You should see something similar to this:

$ ./syncpub &
[1] 6611
$ Waiting for subscribers

$ ./syncsub A &
[2] 6612
$ ./syncsub B &
[3] 6613
$ Broadcasting messages
Message mismatch: received 'Msg #1266' expected 'Msg #1001'
A: received 1000 updates
Message mismatch: received 'Msg #1266' expected 'Msg #1001'
B: received 1000 updates

[2]-  Done                    ./syncsub A
[3]+  Done                    ./syncsub B


syncpub.c:

#include <stdio.h>
#include <strings.h>
#include <unistd.h>
#include <zmq.h>

// Wait for 2 subscribers
#define SUBSCRIBERS_EXPECTED 2

int
main(int argc, char *argv[])
{
  void *context = zmq_ctx_new();
  if (context == 0) {
    fprintf(stderr, "Error: zmq_ctx_new failed: %s\n", zmq_strerror(errno));
    return 1;
  }
  int rc = zmq_ctx_set(context, ZMQ_IO_THREADS, 1);
  if (rc != 0) {
    fprintf(stderr, "Error: zmq_ctx_set failed: %s\n", zmq_strerror(errno));
    return 2;
  }

  // Socket to talk to clients
  void *publisher = zmq_socket(context, ZMQ_PUB);
  if (publisher == 0) {
    fprintf(stderr, "Error: zmq_socket(ZMQ_PUB) failed: %s\n",
zmq_strerror(errno));
    return 4;
  }

  rc = zmq_bind (publisher, "tcp://*:5561");
  if (rc != 0) {
    fprintf(stderr, "Error: zmq_bind failed: %s\n", zmq_strerror(errno));
    return 5;
  }

  // Socket to receive signals
  void *syncservice = zmq_socket (context, ZMQ_REP);
  if (syncservice == 0) {
    fprintf(stderr, "Error: zmq_socket(ZMQ_REP) failed: %s\n",
zmq_strerror(errno));
    return 6;
  }

  rc = zmq_bind (syncservice, "tcp://*:5562");
  if (rc != 0) {
    fprintf(stderr, "Error: zmq_bind failed: %s\n", zmq_strerror(errno));
    return 7;
  }

  // Get synchronization from subscribers
  printf ("Waiting for subscribers\n");
  int subscribers = 0;
  while (subscribers < SUBSCRIBERS_EXPECTED) {
    char tmp[24];

    // - wait for synchronization request
    int size = zmq_recv(syncservice, tmp, sizeof(tmp), 0);
    if (size < 0) {
      fprintf(stderr, "Error: zmq_recv failed: %s\n", zmq_strerror(errno));
      return 8;
    }

    // - send synchronization reply
    rc = zmq_send(syncservice, "", 0, 0);
    if (rc < 0) {
      fprintf(stderr, "Error: zmq_send failed: %s\n", zmq_strerror(errno));
      return 9;
    }

    subscribers++;
  }
  // Now broadcast exactly 100,000 updates followed by END
  printf ("Broadcasting messages\n");
  int update_nbr;
  for (update_nbr = 0; update_nbr < 100000; update_nbr++) {
    char buf[24];
    int len;
    sprintf(buf, "Msg #%d", update_nbr+1);
    len = strlen(buf);

    int size = zmq_send(publisher, buf, len, 0);
    if (size != len) {
      if (errno == EAGAIN) {
        update_nbr--;
        continue;
      }
      fprintf(stderr, "Error: zmq_send failed: %s\n", zmq_strerror(errno));
      return 10;
    }
  }

  int size = zmq_send(publisher, "END", 3, 0);
  if (size != 3) {
    fprintf(stderr, "Error: zmq_send failed: %s\n", zmq_strerror(errno));
    return 11;
  }

  sleep(10);

  zmq_close(publisher);
  zmq_close(syncservice);
  zmq_term (context);

  return 0;
}


syncsub.c:


#include <stdio.h>
#include <strings.h>
#include <unistd.h>
#include <zmq.h>

int
main(int argc, char *argv[])
{
  const char *prefix = (argc > 1 ? argv[1] : "");

  void *context = zmq_ctx_new();
  if (context == 0) {
    fprintf(stderr, "Error: zmq_ctx_new failed: %s\n", zmq_strerror(errno));
    return 1;
  }
  int rc = zmq_ctx_set(context, ZMQ_IO_THREADS, 1);
  if (rc != 0) {
    fprintf(stderr, "Error: zmq_ctx_set failed: %s\n", zmq_strerror(errno));
    return 2;
  }

  // First, connect our subscriber socket
  void *subscriber = zmq_socket(context, ZMQ_SUB);
  if (subscriber == 0) {
    fprintf(stderr, "Error: zmq_socket(ZMQ_SUB) failed: %s\n",
zmq_strerror(errno));
    return 4;
  }

  rc = zmq_connect(subscriber, "tcp://localhost:5561");
  if (rc != 0) {
    fprintf(stderr, "Error: zmq_connect failed: %s\n", zmq_strerror(errno));
    return 5;
  }

  rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
  if (rc != 0) {
    fprintf(stderr, "Error: zmq_setsockopt failed: %s\n", zmq_strerror(errno));
    return 6;
  }

  // 0MQ is so fast, we need to wait a whileâ¦
  sleep (1);

  // Second, synchronize with publisher
  void *syncclient = zmq_socket (context, ZMQ_REQ);
  if (syncclient == 0) {
    fprintf(stderr, "Error: zmq_socket(ZMQ_REQ) failed: %s\n",
zmq_strerror(errno));
    return 7;
  }

  rc = zmq_connect (syncclient, "tcp://localhost:5562");
  if (rc != 0) {
    fprintf(stderr, "Error: zmq_connect failed: %s\n", zmq_strerror(errno));
    return 8;
  }

  // - send a synchronization request
  rc = zmq_send(syncclient, "", 0, 0);
  if (rc < 0) {
    fprintf(stderr, "Error: zmq_send failed: %s\n", zmq_strerror(errno));
    return 9;
  }

  // - wait for synchronization reply
  char buf[24];
  int size = zmq_recv(syncclient, buf, sizeof(buf), 0);
  if (size < 0) {
    fprintf(stderr, "Error: zmq_recv failed: %s\n", zmq_strerror(errno));
    return 10;
  }

  // Third, get our updates and report how many we got
  int update_nbr = 0;
  while (1) {
    size = zmq_recv(subscriber, buf, sizeof(buf), 0);
    if (size < 0) {
      fprintf(stderr, "Error: zmq_recv failed: %s\n", zmq_strerror(errno));
      return 11;
    }

    // Make sure the message numbers match
    char refbuf[24];
    sprintf(refbuf, "Msg #%d", update_nbr+1);

    if (memcmp(buf, refbuf, strlen(refbuf)) != 0) {
      printf("Message mismatch: received '%*.*s' expected '%s'\n",
size, size, buf, refbuf);
      break;
    }

    if (memcmp(buf, "END", 3) == 0) {
      break;
    }
    update_nbr++;
  }
  printf ("%s: received %d updates\n", prefix, update_nbr);

  zmq_close (subscriber);
  zmq_close (syncclient);
  zmq_term (context);

  return 0;
}



More information about the zeromq-dev mailing list