[zeromq-dev] PUB/SUB overload: duplicate messages

Marko Mikulicic marko.mikulicic at isti.cnr.it
Thu Jan 20 15:11:27 CET 2011


Hi Andreas,

I noticed that you use zmq_msg_init_data to create a message pointing to a
static buffer which you share between oustanding messages.

With this small change your test code works.

--- bug.c.old 2011-01-20 15:05:16.534210001 +0100
+++ bug.c 2011-01-20 15:05:37.884210001 +0100
@@ -30,7 +30,9 @@
    memset(&msg, -1, sizeof(msg));
    msg.sequence = i;
    strcpy(msg.szText, "Dummy text...");
-   zmq_msg_init_data(&zmqMsg, &msg, sizeof(msg), NULL, NULL);
+   zmq_msg_init_size(&zmqMsg, sizeof(msg));
+   memcpy (zmq_msg_data (&zmqMsg), &msg, sizeof(msg));
+
    zmq_send(zmqSocket, &zmqMsg, 0);
    zmq_msg_close(&zmqMsg);
    printf("Server: message(%d) = [%d, %s]\n", i, msg.sequence, msg.szText);


My change is not optimal, it just shows where the problem is. I guess you
should operate on the data buffer allocated by zmq with zmq_msg_data
(&zmqMsg)
(do your " .sequence =" operation .. and your strcpy directly there).

You might also use zmq_msg_init_data, but ensure that the message body is
not reused until the 'free' is invoked. You can pass a function pointer to
your custom free function, useful if you want to keep a cache
of preconstructed objects (like the slab allocator etc).

The documentation states (http://zguide.zeromq.org/chapter:2):

Do not use zmq_msg_move(3) <http://api.zeromq.org/zmq_msg_move.html>,
> zmq_msg_copy(3) <http://api.zeromq.org/zmq_msg_copy.html>, or
> zmq_msg_init_data(3) <http://api.zeromq.org/zmq_msg_init_data.html>unless
> you read the man pages and know precisely why you need these.


Cheers,
Marko

On 20 January 2011 14:49, Zoufal Andreas <Andreas.Zoufal at ait.ac.at> wrote:

> Hi, this is my reduced test case, I hope it is better now.
> System: PC, Ubuntu 10.04 32 Bit, 0MQ 2.1.0
>
> Description: one server thread sends 10 messages to a PUB socket as fast as
> possible. One client is connected as SUB to this socket and receives
> *always* the correct number of messages, but not always with the right
> content. Sometimes messages seem to be duplicated (overwritten?). Inserting
> a delay between publishing messages avoids this problem.
>
> My question is: is this an expected behavior on system overload or a bug?
> Is there something I can do on the client side to improve the robustness?
>
> Kind regards,
> Andi
>
> -------------OOO---------------
>
> #define _BSD_SOURCE
>
> #include <stdio.h>
> #include <string.h>
> #include <pthread.h>
> #include <unistd.h>
>
> #include <zmq.h>
>
> void *zmqContext = NULL;
>
> typedef struct MESSAGE_T {
>  int sequence;
>  char szText[1000];
> } MESSAGE;
>
> static void *threadServer (void *pArg) {
>  void *zmqSocket;
>  zmq_msg_t zmqMsg;
>  MESSAGE msg;
>  int i;
>
>  printf("Server: started\n");
>  zmqSocket = zmq_socket(zmqContext, ZMQ_PUB);
>  zmq_bind(zmqSocket, "ipc://pub.ipc");
>
>  sleep(1); // some time for client
>
>  for (i = 0; i < 10; i += 1) {
>    memset(&msg, -1, sizeof(msg));
>    msg.sequence = i;
>    strcpy(msg.szText, "Dummy text...");
>    zmq_msg_init_data(&zmqMsg, &msg, sizeof(msg), NULL, NULL);
>    zmq_send(zmqSocket, &zmqMsg, 0);
>    zmq_msg_close(&zmqMsg);
>    printf("Server: message(%d) = [%d, %s]\n", i, msg.sequence, msg.szText);
>
> //    usleep(1000); // delay between messages
>  }
>
>  sleep(1); // some time for client
>
>  zmq_close(zmqSocket);
>  printf("Server: finished\n");
>  return pArg;
> }
>
> static void *threadClient (void *pArg) {
>  void *zmqSocket;
>  zmq_msg_t zmqMsg;
>  MESSAGE msg;
>  int i;
>
>  printf("Client: started\n");
>  zmqSocket = zmq_socket(zmqContext, ZMQ_SUB);
>  zmq_setsockopt (zmqSocket, ZMQ_SUBSCRIBE, "", 0);
>  zmq_connect(zmqSocket, "ipc://pub.ipc");
>
>  for (i = 0; i < 10; i += 1) {
>    zmq_msg_init(&zmqMsg);
>    zmq_recv(zmqSocket, &zmqMsg, 0);
>    memcpy(&msg, zmq_msg_data(&zmqMsg), sizeof(msg));
>    zmq_msg_close(&zmqMsg);
>    printf("Client: message(%d) = [%d, %s]\n", i, msg.sequence, msg.szText);
>  }
>
>  zmq_close(zmqSocket);
>  printf("Client: finished\n");
>  return pArg;
> }
>
> int main(void)
> {
>  pthread_t client = 0;
>  pthread_t server = 0;
>
>  zmqContext = zmq_init(1);
>  pthread_create(&client, NULL, threadClient, NULL);
>  sleep(1);
>  pthread_create(&server, NULL, threadServer, NULL);
>
>  pthread_join(server, NULL);
>  pthread_join(client, NULL);
>  zmq_term(zmqContext);
>  return 0;
> }
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20110120/906f372d/attachment.htm>


More information about the zeromq-dev mailing list