[zeromq-dev] Question about the PUBSUB design pattern when server restarts.
Janak Raja
janak_raja at yahoo.com
Thu Nov 11 19:53:49 CET 2010
Hello,
I recently came across zeroMQ and wanted to understand on what will be the
behavior for PUBSUB design pattern when PUB goes down for more than 2 minutes.
I tried wu example with a little modification to send packets continuously.
Please let me know what I am doing wrong in this.
Do I have to call connect from client side after every s_recv call ?
Currently, I see that if I dont call connect on SUB and PUB stops and 2
minutes later starts again, memory usage on pub keeps on increasing ? May be,
its because it might be keeping it in queue for the sub to get the data.
Please advice,
Here is the Publisher code:
//
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
#include "zhelpers.h"
int main () {
// Prepare our context and publisher
void *context = zmq_init (1);
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5556");
zmq_bind (publisher, "ipc://weather.ipc");
// Initialize random number generator
srandom ((unsigned) time (NULL));
while (1) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = within (100000);
temperature = within (215) - 80;
relhumidity = within (50) + 10;
// Send message to all subscribers
char update [20];
sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send (publisher, update);
}
zmq_close (publisher);
zmq_term (context);
return 0;
}
Subscriber code:
//
// Weather update client
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
#include "zhelpers.h"
int main (int argc, char *argv[])
{
void *context = zmq_init (1);
// Socket to talk to server
printf ("Collecting updates from weather server...\n");
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
// Subscribe to zipcode, default is NYC, 10001
char *filter = (argc > 1)? argv [1]: "10001 ";
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
while(1)
{
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
char *string = s_recv (subscriber);
int zipcode, temperature, relhumidity;
sscanf (string, "%d %d %d",
&zipcode, &temperature, &relhumidity);
total_temp += temperature;
free (string);
}
printf ("Average temperature for zipcode '%s' was %dF\n",
filter, (int) (total_temp / update_nbr));
}
zmq_close (subscriber);
zmq_term (context);
return 0;
}
Thanks,
Janak
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20101111/ab265b23/attachment.htm>
More information about the zeromq-dev
mailing list