[zeromq-dev] PUB/SUB overload: duplicate messages

Zoufal Andreas Andreas.Zoufal at ait.ac.at
Thu Jan 20 14:49:27 CET 2011


Hi, this is my reduced test case, I hope it is better now.
System: PC, Ubuntu 10.04 32 Bit, 0MQ 2.1.0

Description: one server thread sends 10 messages to a PUB socket as fast as possible. One client is connected as SUB to this socket and receives *always* the correct number of messages, but not always with the right content. Sometimes messages seem to be duplicated (overwritten?). Inserting a delay between publishing messages avoids this problem.

My question is: is this an expected behavior on system overload or a bug? Is there something I can do on the client side to improve the robustness?

Kind regards,
Andi

-------------OOO---------------

#define _BSD_SOURCE

#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#include <zmq.h>

void *zmqContext = NULL;

typedef struct MESSAGE_T {
  int sequence;
  char szText[1000];
} MESSAGE;

static void *threadServer (void *pArg) {
  void *zmqSocket;
  zmq_msg_t zmqMsg;
  MESSAGE msg;
  int i;

  printf("Server: started\n");
  zmqSocket = zmq_socket(zmqContext, ZMQ_PUB);
  zmq_bind(zmqSocket, "ipc://pub.ipc");

  sleep(1); // some time for client

  for (i = 0; i < 10; i += 1) {
    memset(&msg, -1, sizeof(msg));
    msg.sequence = i;
    strcpy(msg.szText, "Dummy text...");
    zmq_msg_init_data(&zmqMsg, &msg, sizeof(msg), NULL, NULL);
    zmq_send(zmqSocket, &zmqMsg, 0);
    zmq_msg_close(&zmqMsg);
    printf("Server: message(%d) = [%d, %s]\n", i, msg.sequence, msg.szText);

//    usleep(1000); // delay between messages
  }

  sleep(1); // some time for client

  zmq_close(zmqSocket);
  printf("Server: finished\n");
  return pArg;
}

static void *threadClient (void *pArg) {
  void *zmqSocket;
  zmq_msg_t zmqMsg;
  MESSAGE msg;
  int i;

  printf("Client: started\n");
  zmqSocket = zmq_socket(zmqContext, ZMQ_SUB);
  zmq_setsockopt (zmqSocket, ZMQ_SUBSCRIBE, "", 0);
  zmq_connect(zmqSocket, "ipc://pub.ipc");

  for (i = 0; i < 10; i += 1) {
    zmq_msg_init(&zmqMsg);
    zmq_recv(zmqSocket, &zmqMsg, 0);
    memcpy(&msg, zmq_msg_data(&zmqMsg), sizeof(msg));
    zmq_msg_close(&zmqMsg);
    printf("Client: message(%d) = [%d, %s]\n", i, msg.sequence, msg.szText);
  }

  zmq_close(zmqSocket);
  printf("Client: finished\n");
  return pArg;
}

int main(void)
{
  pthread_t client = 0;
  pthread_t server = 0;

  zmqContext = zmq_init(1);
  pthread_create(&client, NULL, threadClient, NULL);
  sleep(1);
  pthread_create(&server, NULL, threadServer, NULL);

  pthread_join(server, NULL);
  pthread_join(client, NULL);
  zmq_term(zmqContext);
  return 0;
}



More information about the zeromq-dev mailing list