[zeromq-dev] pub sub memory leak problem

david starkweather starkdg at comcast.net
Sat Aug 7 20:29:07 CEST 2010


Hi, 

As it turns out, it seems this is not a zeromq issue at all.  But an
issue with pthreads. Sorry for the confusion. I just didnt want to leave
this thread hanging.

I included the code that made this leak happen.  Without the thread
creation, the pub/sub works fine with no leaks.  It's when a new thread
is created each time, the resources dont get released back.  (I think
creating the threads in a "detached" state might work.)

 


#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <zmq.h>


int main(int argc, char **argv){
    void *ctx = zmq_init(1);
    if (ctx == NULL){
        printf("unable to create zmq context\n");
        exit(1);
    }
    void *skt = zmq_socket(ctx, ZMQ_PUB);
    if (skt == NULL){
        printf("unable to get socket\n");
        exit(1);
    }

    int rc = zmq_bind(skt, "tcp://lo:4000");
    if (rc){
        printf("unable to bind to address tcp://lo:4000\n");
        exit(1);
    }

    static char *topic_str = "MSG";
    srand(198398);

    zmq_msg_t init_msg, cmd_msg, nbframes_msg, frames_msg;
    uint8_t cmd;
    uint32_t nbframes;
    uint32_t *frames;

    sleep(10);

    unsigned int i;
    while (1){
        zmq_msg_init_data(&init_msg, topic_str, strlen(topic_str)+1,
NULL, NULL);
        zmq_send(skt, &init_msg, ZMQ_SNDMORE);
        zmq_msg_close(&init_msg);

        zmq_msg_init_size(&cmd_msg, sizeof(uint8_t));
        cmd = 1;
        memcpy(zmq_msg_data(&cmd_msg), &cmd, sizeof(uint8_t));
        zmq_send(skt, &cmd_msg, ZMQ_SNDMORE);
        zmq_msg_close(&cmd_msg);

        nbframes = rand()%20000;
        zmq_msg_init_size(&nbframes_msg, sizeof(uint32_t));
        memcpy(zmq_msg_data(&nbframes_msg), &nbframes,
sizeof(uint32_t));
        zmq_send(skt, &nbframes_msg, ZMQ_SNDMORE);
        zmq_msg_close(&nbframes_msg);

        frames = (uint32_t*)malloc(nbframes*sizeof(uint32_t));

        for (i=0;i<nbframes;i++){
            frames[i] = rand();
        }

        zmq_msg_init_data(&frames_msg, frames,
nbframes*sizeof(uint32_t), NULL, NULL);
        zmq_send(skt, &frames_msg, 0);
        zmq_msg_close(&frames_msg);
        free(frames);

	sleep(2);

    }
    zmq_close(skt);
    zmq_term(ctx);

    return 0;
}

#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <zmq.h>
#include <pthread.h>

void* dowork(void *arg){

    fprintf(stdout,"in do work\n");


    return NULL;
}

int main(int argc, char **argv){
    void *ctx = zmq_init(1);
    if (ctx == NULL){
        printf("unable to create zmq context\n");
        exit(1);
    }

    void *skt = zmq_socket(ctx, ZMQ_SUB);
    if (skt == NULL){
        printf("unable to get socket\n");
        exit(1);
    }

    int rc = zmq_connect(skt, "tcp://localhost:4000");
    if (rc){
        printf("unable to connect to address tcp://lo:4000\n");
        exit(1);
    }

    zmq_setsockopt(skt, ZMQ_SUBSCRIBE, "", 0);

    zmq_msg_t init_msg, cmd_msg, nbframes_msg, frames_msg;
    uint8_t cmd;
    uint32_t nbframes;
    char *init = NULL;
    unsigned int i = 0;

    pthread_t worker_thr;
    while (1){
        zmq_msg_init(&init_msg);
        zmq_recv(skt, &init_msg, 0);
        init = (char*)zmq_msg_data(&init_msg);
        printf("recieved: %s\n", init);
        zmq_msg_close(&init_msg);

        zmq_msg_init(&cmd_msg);
        zmq_recv(skt, &cmd_msg, 0);
        memcpy(&cmd, zmq_msg_data(&cmd_msg), sizeof(uint8_t));
        printf("cmd = %u\n", cmd);
        zmq_msg_close(&cmd_msg);

        zmq_msg_init(&nbframes_msg);
        zmq_recv(skt, &nbframes_msg, 0);
        memcpy(&nbframes, zmq_msg_data(&nbframes_msg),
sizeof(uint32_t));
        printf("nbframes = %u\n", nbframes);
        zmq_msg_close(&nbframes_msg);

        zmq_msg_init(&frames_msg);
        zmq_recv(skt, &frames_msg, 0);
        printf("frames: %ld length array\n",
zmq_msg_size(&frames_msg)/sizeof(uint32_t));
        zmq_msg_close(&frames_msg);

	
	rc = pthread_create(&worker_thr, NULL, dowork, NULL);
	if (rc != 0){
	    printf("unable to create worker thread\n");
	}

        i++;
    }
 
    zmq_close(skt);
    zmq_term(ctx);

    return 0;
}

On Thu, 2010-08-05 at 15:00 -0400, david starkweather wrote:
> just came across an interesting result.  As noted from the previous
> code,  valgrind did not detect any memory leak.  However, that was just
> one message.  In this link,  there is code that loops through many
> messages.  When I add the line at the end of the loop,  sleep(1), I can
> run the code many times in a loop without any apparent memory leak.  But
> wihout that line,  the memory quickly climbs.
> 
> http://pastebin.com/DjGbx6kZ
> 
> 
> On Thu, 2010-08-05 at 13:44 -0400, david starkweather wrote:
> > I just ran it in valgrind and put the results in the pastebin as well:
> > 
> > http://pastebin.com/2M3yYhmm
> > 
> > 
> > On Thu, 2010-08-05 at 19:02 +0200, Pieter Hintjens wrote:
> > > On Thu, Aug 5, 2010 at 6:50 PM, Matt Weinstein <matt_weinstein at yahoo.com> wrote:
> > > 
> > > > missing free_fn for zmq_msg_init_data?
> > > 
> > > Nope, it's using static data plus the memory leak is in the
> > > subscriber, presumably.
> > > 
> > > David, the code looks fine.  Can you reduce this to the simplest possible
> > > case that demonstrates a memory leak?
> > > 
> > > -Pieter
> > > _______________________________________________
> > > zeromq-dev mailing list
> > > zeromq-dev at lists.zeromq.org
> > > http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> > 
> > _______________________________________________
> > zeromq-dev mailing list
> > zeromq-dev at lists.zeromq.org
> > http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> 
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev




More information about the zeromq-dev mailing list