[zeromq-dev] PUB/SUB overload: duplicate messages

Pieter Hintjens ph at imatix.com
Thu Jan 20 15:14:49 CET 2011


Zoufal,

Thank for stripping down the code, it's easier to read now and clear
what the problem is.

You are using a single static buffer for all messages. As 0MQ sends
messages, your code overwrites the buffer with new data and messages
that are queued but not yet sent will refer to the new, not original,
data.  The delay 'fixes' things but is obviously not a solution.

In general zmq_msg_init_data() is vulnerable to this problem because
it sets a reference to memory but does not copy the memory.

The correct approach is to allocate the necessary memory for every
message you send, and allow 0MQ to free that memory once the message
has been sent.  You can also use zmq_msg_init_size() to allocate
memory per message.  For example this is how the zhelpers.h s_send()
function does it:

----
//  Convert C string to 0MQ string and send to socket
static int
s_send (void *socket, char *string) {
    int rc;
    zmq_msg_t message;
    zmq_msg_init_size (&message, strlen (string));
    memcpy (zmq_msg_data (&message), string, strlen (string));
    rc = zmq_send (socket, &message, 0);
    assert (!rc);
    zmq_msg_close (&message);
    return (rc);
}
----

I'd recommend reading the Guide and using the helper functions that
the examples use, at least until you are much further advanced and
start to be doing _very_ high performance work.

Regards
Pieter Hintjens
iMatix


On Thu, Jan 20, 2011 at 2:49 PM, Zoufal Andreas
<Andreas.Zoufal at ait.ac.at> wrote:
> Hi, this is my reduced test case, I hope it is better now.
> System: PC, Ubuntu 10.04 32 Bit, 0MQ 2.1.0
>
> Description: one server thread sends 10 messages to a PUB socket as fast as possible. One client is connected as SUB to this socket and receives *always* the correct number of messages, but not always with the right content. Sometimes messages seem to be duplicated (overwritten?). Inserting a delay between publishing messages avoids this problem.
>
> My question is: is this an expected behavior on system overload or a bug? Is there something I can do on the client side to improve the robustness?
>
> Kind regards,
> Andi
>
> -------------OOO---------------
>
> #define _BSD_SOURCE
>
> #include <stdio.h>
> #include <string.h>
> #include <pthread.h>
> #include <unistd.h>
>
> #include <zmq.h>
>
> void *zmqContext = NULL;
>
> typedef struct MESSAGE_T {
>  int sequence;
>  char szText[1000];
> } MESSAGE;
>
> static void *threadServer (void *pArg) {
>  void *zmqSocket;
>  zmq_msg_t zmqMsg;
>  MESSAGE msg;
>  int i;
>
>  printf("Server: started\n");
>  zmqSocket = zmq_socket(zmqContext, ZMQ_PUB);
>  zmq_bind(zmqSocket, "ipc://pub.ipc");
>
>  sleep(1); // some time for client
>
>  for (i = 0; i < 10; i += 1) {
>    memset(&msg, -1, sizeof(msg));
>    msg.sequence = i;
>    strcpy(msg.szText, "Dummy text...");
>    zmq_msg_init_data(&zmqMsg, &msg, sizeof(msg), NULL, NULL);
>    zmq_send(zmqSocket, &zmqMsg, 0);
>    zmq_msg_close(&zmqMsg);
>    printf("Server: message(%d) = [%d, %s]\n", i, msg.sequence, msg.szText);
>
> //    usleep(1000); // delay between messages
>  }
>
>  sleep(1); // some time for client
>
>  zmq_close(zmqSocket);
>  printf("Server: finished\n");
>  return pArg;
> }
>
> static void *threadClient (void *pArg) {
>  void *zmqSocket;
>  zmq_msg_t zmqMsg;
>  MESSAGE msg;
>  int i;
>
>  printf("Client: started\n");
>  zmqSocket = zmq_socket(zmqContext, ZMQ_SUB);
>  zmq_setsockopt (zmqSocket, ZMQ_SUBSCRIBE, "", 0);
>  zmq_connect(zmqSocket, "ipc://pub.ipc");
>
>  for (i = 0; i < 10; i += 1) {
>    zmq_msg_init(&zmqMsg);
>    zmq_recv(zmqSocket, &zmqMsg, 0);
>    memcpy(&msg, zmq_msg_data(&zmqMsg), sizeof(msg));
>    zmq_msg_close(&zmqMsg);
>    printf("Client: message(%d) = [%d, %s]\n", i, msg.sequence, msg.szText);
>  }
>
>  zmq_close(zmqSocket);
>  printf("Client: finished\n");
>  return pArg;
> }
>
> int main(void)
> {
>  pthread_t client = 0;
>  pthread_t server = 0;
>
>  zmqContext = zmq_init(1);
>  pthread_create(&client, NULL, threadClient, NULL);
>  sleep(1);
>  pthread_create(&server, NULL, threadServer, NULL);
>
>  pthread_join(server, NULL);
>  pthread_join(client, NULL);
>  zmq_term(zmqContext);
>  return 0;
> }
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>



More information about the zeromq-dev mailing list