[zeromq-dev] PUB/SUB overload: duplicate messages
Zoufal Andreas
Andreas.Zoufal at ait.ac.at
Thu Jan 20 14:49:27 CET 2011
Hi, this is my reduced test case, I hope it is better now.
System: PC, Ubuntu 10.04 32 Bit, 0MQ 2.1.0
Description: one server thread sends 10 messages to a PUB socket as fast as possible. One client is connected as SUB to this socket and receives *always* the correct number of messages, but not always with the right content. Sometimes messages seem to be duplicated (overwritten?). Inserting a delay between publishing messages avoids this problem.
My question is: is this an expected behavior on system overload or a bug? Is there something I can do on the client side to improve the robustness?
Kind regards,
Andi
-------------OOO---------------
#define _BSD_SOURCE
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <zmq.h>
void *zmqContext = NULL;
typedef struct MESSAGE_T {
int sequence;
char szText[1000];
} MESSAGE;
static void *threadServer (void *pArg) {
void *zmqSocket;
zmq_msg_t zmqMsg;
MESSAGE msg;
int i;
printf("Server: started\n");
zmqSocket = zmq_socket(zmqContext, ZMQ_PUB);
zmq_bind(zmqSocket, "ipc://pub.ipc");
sleep(1); // some time for client
for (i = 0; i < 10; i += 1) {
memset(&msg, -1, sizeof(msg));
msg.sequence = i;
strcpy(msg.szText, "Dummy text...");
zmq_msg_init_data(&zmqMsg, &msg, sizeof(msg), NULL, NULL);
zmq_send(zmqSocket, &zmqMsg, 0);
zmq_msg_close(&zmqMsg);
printf("Server: message(%d) = [%d, %s]\n", i, msg.sequence, msg.szText);
// usleep(1000); // delay between messages
}
sleep(1); // some time for client
zmq_close(zmqSocket);
printf("Server: finished\n");
return pArg;
}
static void *threadClient (void *pArg) {
void *zmqSocket;
zmq_msg_t zmqMsg;
MESSAGE msg;
int i;
printf("Client: started\n");
zmqSocket = zmq_socket(zmqContext, ZMQ_SUB);
zmq_setsockopt (zmqSocket, ZMQ_SUBSCRIBE, "", 0);
zmq_connect(zmqSocket, "ipc://pub.ipc");
for (i = 0; i < 10; i += 1) {
zmq_msg_init(&zmqMsg);
zmq_recv(zmqSocket, &zmqMsg, 0);
memcpy(&msg, zmq_msg_data(&zmqMsg), sizeof(msg));
zmq_msg_close(&zmqMsg);
printf("Client: message(%d) = [%d, %s]\n", i, msg.sequence, msg.szText);
}
zmq_close(zmqSocket);
printf("Client: finished\n");
return pArg;
}
int main(void)
{
pthread_t client = 0;
pthread_t server = 0;
zmqContext = zmq_init(1);
pthread_create(&client, NULL, threadClient, NULL);
sleep(1);
pthread_create(&server, NULL, threadServer, NULL);
pthread_join(server, NULL);
pthread_join(client, NULL);
zmq_term(zmqContext);
return 0;
}
More information about the zeromq-dev
mailing list