[zeromq-dev] Question about the PUBSUB design pattern when server restarts.
Joshua Foster
jhawk28 at gmail.com
Fri Nov 12 00:47:48 CET 2010
I'm not seeing the issue in my environment, but I am using jzmq (Java)
and the latest from maint. To make sure your code isn't at fault, try
setting the high water mark on the publisher. Also, try closing the
socket when the subscriber closes.
Joshua
On 11/11/2010 5:11 PM, Janak Raja wrote:
> Hello Joshua,
>
> Thanks for your response. I donot care when subscriber gets the data
> as sequence for my publisher is infinite. Meaning no Start / no End.
>
> How do I close this previous queue ?
> Even though I stop/start publisher, does it still maintain a state
> of the previously connected subscriber and their queue ?
> Any suggestion on how to solve this would be very helpful ?
> Thanks,
> Janak
>
> ------------------------------------------------------------------------
> *From:* Joshua Foster <jhawk28 at gmail.com>
> *To:* ZeroMQ development list <zeromq-dev at lists.zeromq.org>
> *Sent:* Thu, November 11, 2010 1:19:27 PM
> *Subject:* Re: [zeromq-dev] Question about the PUBSUB design pattern
> when server restarts.
>
> If you want the subscriber to pick up where it last left off, be sure
> to set the identity. The publisher is probably creating a new queue
> with the "second" subscriber, but leaving the previous queue open. The
> previous queue is the one that continues to grow (causing memory to
> increase).
>
> Joshua
>
> On 11/11/2010 1:53 PM, Janak Raja wrote:
>> Hello,
>>
>> I recently came across zeroMQ and wanted to understand on what will
>> be the behavior for PUBSUB design pattern when PUB goes down for more
>> than 2 minutes.
>>
>> I tried wu example with a little modification to send packets
>> continuously. Please let me know what I am doing wrong in this.
>>
>> Do I have to call connect from client side after every s_recv call ?
>> Currently, I see that if I dont call connect on SUB and PUB stops
>> and 2 minutes later starts again, memory usage on pub keeps on
>> increasing ? May be, its because it might be keeping it in queue for
>> the sub to get the data.
>>
>> Please advice,
>>
>> Here is the Publisher code:
>> //
>> // Weather update server
>> // Binds PUB socket to tcp://*:5556
>> // Publishes random weather updates
>> //
>> #include "zhelpers.h"
>>
>> int main () {
>> // Prepare our context and publisher
>> void *context = zmq_init (1);
>> void *publisher = zmq_socket (context, ZMQ_PUB);
>> zmq_bind (publisher, "tcp://*:5556");
>> zmq_bind (publisher, "ipc://weather.ipc");
>>
>> // Initialize random number generator
>> srandom ((unsigned) time (NULL));
>> while (1) {
>> // Get values that will fool the boss
>> int zipcode, temperature, relhumidity;
>> zipcode = within (100000);
>> temperature = within (215) - 80;
>> relhumidity = within (50) + 10;
>>
>> // Send message to all subscribers
>> char update [20];
>> sprintf (update, "%05d %d %d", zipcode, temperature,
>> relhumidity);
>> s_send (publisher, update);
>> }
>> zmq_close (publisher);
>> zmq_term (context);
>> return 0;
>> }
>>
>>
>>
>>
>> Subscriber code:
>> //
>> // Weather update client
>> // Connects SUB socket to tcp://localhost:5556
>> // Collects weather updates and finds avg temp in zipcode
>> //
>> #include "zhelpers.h"
>>
>> int main (int argc, char *argv[])
>> {
>> void *context = zmq_init (1);
>>
>> // Socket to talk to server
>> printf ("Collecting updates from weather server...\n");
>> void *subscriber = zmq_socket (context, ZMQ_SUB);
>> zmq_connect (subscriber, "tcp://localhost:5556");
>>
>> // Subscribe to zipcode, default is NYC, 10001
>> char *filter = (argc > 1)? argv [1]: "10001 ";
>> zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
>> while(1)
>> {
>> // Process 100 updates
>> int update_nbr;
>> long total_temp = 0;
>> for (update_nbr = 0; update_nbr < 100; update_nbr++) {
>> char *string = s_recv (subscriber);
>> int zipcode, temperature, relhumidity;
>> sscanf (string, "%d %d %d",
>> &zipcode, &temperature, &relhumidity);
>> total_temp += temperature;
>> free (string);
>> }
>> printf ("Average temperature for zipcode '%s' was %dF\n",
>> filter, (int) (total_temp / update_nbr));
>> }
>> zmq_close (subscriber);
>> zmq_term (context);
>> return 0;
>> }
>>
>>
>> Thanks,
>> Janak
>>
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20101111/a6bcfb73/attachment.htm>
More information about the zeromq-dev
mailing list