[zeromq-dev] memory leak in PUB/SUB sockets

Daniel Sentenac sentenac at ego-gw.it
Tue Oct 5 10:32:30 CEST 2010


I repost the test case:

/*
  The server part
*/

#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <inttypes.h>
#include <assert.h>
#include <string.h>
#include <unistd.h>

int main (int argc, char *argv [])
{
    const char *bind_to;
    int message_size;
    void *ctx;
    void *s;
    int rc;
    zmq_msg_t msg;
    const uint64_t high_water_mark = 1; /* Limit on the maximum number 
of messages */
 

    if (argc != 3) {
        printf ("usage: remote_thr <bind-to> <message-size>\n");
        return 1;
    }
    bind_to = argv [1];
    message_size = atoi (argv [2]);

    ctx = zmq_init (1);
    if (!ctx) {
        printf ("error in zmq_init: %s\n", zmq_strerror (errno));
        return -1;
    }

    s = zmq_socket (ctx, ZMQ_PUB);
    if (!s) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return -1;
    }

    //  Add your socket options here.
    //  For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
    rc = zmq_setsockopt (s, ZMQ_HWM, &high_water_mark,
             sizeof high_water_mark);
    assert (rc == 0);
    rc = zmq_bind (s, bind_to);
    if (rc != 0) {
        printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
        return -1;
    }
    while (1) {
     
      rc = zmq_msg_init_size (&msg, message_size);
      if (rc != 0) {
    printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
    return -1;
      }

      rc = zmq_send (s, &msg, 0);
      if (rc != 0) {
    printf ("error in zmq_send: %s\n", zmq_strerror (errno));
    return -1;
      }
      rc = zmq_msg_close (&msg);
      if (rc != 0) {
    printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
    return -1;
      }
      /* we limit the frequency number of message sent here
     (about 10 messages/sec)*/
      usleep(100000);
    }
   
    rc = zmq_close (s);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_term: %s\n", zmq_strerror (errno));
        return -1;
    }

    return 0;
}

/*
    The client part
*/

#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <inttypes.h>
#include <assert.h>
#include <unistd.h>

int main (int argc, char *argv [])
{
    const char *connect_to;
    void *ctx;
    void *s;
    int rc;
    zmq_msg_t msg;
    int message_size;

    const uint64_t high_water_mark = 1; /* Limit on the maximum number 
of messages */

    if (argc != 2) {
        printf ("usage: local_thr <connect-to>\n");
        return 1;
    }
    connect_to = argv [1];
  
    ctx = zmq_init (1);
    if (!ctx) {
        printf ("error in zmq_init: %s\n", zmq_strerror (errno));
        return -1;
    }

    s = zmq_socket (ctx, ZMQ_SUB);
    if (!s) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE , "", 0);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }

    //  Add your socket options here.
    //  For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
    rc = zmq_setsockopt (s, ZMQ_HWM, &high_water_mark,
             sizeof high_water_mark);
    rc = zmq_connect (s, connect_to);
    if (rc != 0) {
        printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
        return -1;
    }
 
    while (1) {
      rc = zmq_msg_init (&msg);
      if (rc != 0) {
        printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
        return -1;
      }
      rc = zmq_recv (s, &msg, 0);
      if (rc != 0) {
    printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
    return -1;
      }
      message_size = (int) zmq_msg_size(&msg);
 
      printf ("message size: %d [B]\n", message_size);

      /* do some treatment on the data that makes receiver slowler than 
sender
     (we treat about 5 messages / sec)*/
      usleep(200000);
    
      rc = zmq_msg_close (&msg);
     
      if (rc != 0) {
    printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
    return -1;
      }     
    }

    rc = zmq_close (s);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_term: %s\n", zmq_strerror (errno));
        return -1;
    }

    return 0;
}



More information about the zeromq-dev mailing list