[zeromq-dev] Question about the PUBSUB design pattern when server restarts.

Joshua Foster jhawk28 at gmail.com
Thu Nov 11 22:19:27 CET 2010


If you want the subscriber to pick up where it last left off, be sure to 
set the identity. The publisher is probably creating a new queue with 
the "second" subscriber, but leaving the previous queue open. The 
previous queue is the one that continues to grow (causing memory to 
increase).

Joshua

On 11/11/2010 1:53 PM, Janak Raja wrote:
> Hello,
>
>   I recently came across zeroMQ and wanted to understand on what will 
> be the behavior for PUBSUB design pattern when PUB goes down for more 
> than 2 minutes.
>
>   I tried wu example with a little modification to send packets 
> continuously. Please let me know what I am doing wrong in this.
>
>   Do I have to call connect from client side after every s_recv call ?
>   Currently, I see that if I dont call connect on SUB and PUB stops 
> and 2 minutes later starts again, memory usage on pub keeps on 
> increasing ? May be, its because it might be keeping it in queue for 
> the sub to get the data.
>
>   Please advice,
>
>   Here is the Publisher code:
>      //
> //  Weather update server
> //  Binds PUB socket to tcp://*:5556
> //  Publishes random weather updates
> //
> #include "zhelpers.h"
>
> int main () {
>     //  Prepare our context and publisher
>     void *context = zmq_init (1);
>     void *publisher = zmq_socket (context, ZMQ_PUB);
>     zmq_bind (publisher, "tcp://*:5556");
>     zmq_bind (publisher, "ipc://weather.ipc");
>
>     //  Initialize random number generator
>     srandom ((unsigned) time (NULL));
>     while (1) {
>         //  Get values that will fool the boss
>         int zipcode, temperature, relhumidity;
>         zipcode     = within (100000);
>         temperature = within (215) - 80;
>         relhumidity = within (50) + 10;
>
>         //  Send message to all subscribers
>         char update [20];
>         sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
>         s_send (publisher, update);
>     }
>     zmq_close (publisher);
>     zmq_term (context);
>     return 0;
> }
>
>
>
>
> Subscriber code:
>   //
> //  Weather update client
> //  Connects SUB socket to tcp://localhost:5556
> //  Collects weather updates and finds avg temp in zipcode
> //
> #include "zhelpers.h"
>
> int main (int argc, char *argv[])
> {
>     void *context = zmq_init (1);
>
>     //  Socket to talk to server
>     printf ("Collecting updates from weather server...\n");
>     void *subscriber = zmq_socket (context, ZMQ_SUB);
>     zmq_connect (subscriber, "tcp://localhost:5556");
>
>     //  Subscribe to zipcode, default is NYC, 10001
>     char *filter = (argc > 1)? argv [1]: "10001 ";
>     zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
>     while(1)
>     {
>        //  Process 100 updates
>        int update_nbr;
>        long total_temp = 0;
>        for (update_nbr = 0; update_nbr < 100; update_nbr++) {
>            char *string = s_recv (subscriber);
>            int zipcode, temperature, relhumidity;
>            sscanf (string, "%d %d %d",
> &zipcode, &temperature, &relhumidity);
>            total_temp += temperature;
>            free (string);
>        }
>        printf ("Average temperature for zipcode '%s' was %dF\n",
>            filter, (int) (total_temp / update_nbr));
>     }
>     zmq_close (subscriber);
>     zmq_term (context);
>     return 0;
> }
>
>
> Thanks,
> Janak
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20101111/96b745e8/attachment.htm>


More information about the zeromq-dev mailing list