[zeromq-dev] need help with PUB/SUB

Kenneth Adam Miller kennethadammiller at gmail.com
Fri Mar 6 17:29:58 CET 2015


I notice that in your public code, you wait until you get a request on the
sub socket, and then publish on the workers socket. The workers socket is a
pub socket, and just receiving a request on the sub socket doesn't mean
that all workers have connected. Why not, before you start publishing at
all, have an additional rep socket and wait in a loop to receive and
respond to exactly as many workers as you intend to can off because you
start listening on the sub socket at all. Then what you could do is have
the workers send a req message to that rep socket, and wait for a reply
before sending something to the sub socket. I'm not sure what the sub
socket is supposed to do, I suppose you identified the need for it since
it's a many to one relationship-you have many workers, one publisher. But
you can replace the sub socket's role in waiting for worker message with a
rep socket. In the worker, before sending to the rep socket, make a connect
call to the publisher.

On Fri, Mar 6, 2015 at 10:58 AM, <sven.koebnick at t-online.de> wrote:

>  Hi all!
>
>
> after migration from ZMQ2 to ZMQ4 I had problems with vanished messages
> (on REP/REQ as well as on PUB/SUB).
>
> I now *managed to build some DIRTY "minimal" testcase* (see below)
> displaying at least >one< of my problems.
>
> The testcase does following:
>
> - create a router binding to one SUB socket (for clients) and one PUB
> socket (for workers)
>
> - create some workers listening on a SUB socket connected to the routers
> PUB port
>
> - WAIT some seconds !!!
>
> - send 10000 numbers on a PUB socket connected to routers SUB port in full
> speed
>
> - workers expect numbers 0 to n to be send as massages in correct order
> and complain on missfitting numbers
>
>
>
> The result shows, that *about 1400 to 3000 of 10000 messages get lost*
> until the rest reaches the workers.
>
> WHAT AM I DOING WRONG ?
>
>
> It seems to get a *little* better if I
>
> - increase the threads in context with zmq_ctx_set()
>
> - increase the number of workers
>
> - increase the SND and RCV highwatermarks using set_sockopt()
>
> but it never get really good.
>
>
>
> When I send 100.000 message, the first 3.500 get lost (?!?).
>
> When I send 1000 messages, none arrives ;o(
>
> When I set the SNDHMW and RCVHWM to 0 (zero) which should be "unlimited",
> nothing ever arrives.
>
> many thanks
>
> sven
>
> =====SNIP===output==================================
>
> starting Router ...
>  thread id -1212605632
> starting 10 Workers ...
> finished starting Workers ...
> sending messages ...
> sending messages done
> worker 2 expected message 0, got message 4616
> worker 5 expected message 0, got message 4616
> worker 9 expected message 0, got message 4616
> worker 0 expected message 0, got message 4616
> worker 7 expected message 0, got message 4616
> worker 4 expected message 0, got message 4616
> worker 1 expected message 0, got message 4616
> worker 8 expected message 0, got message 4616
> worker 6 expected message 0, got message 4616
> worker 3 expected message 0, got message 4616
> worker 9 ending after 5383 good messages. last message was 9999
> worker 2 ending after 5383 good messages. last message was 9999
> worker 7 ending after 5383 good messages. last message was 9999
> worker 5 ending after 5383 good messages. last message was 9999
> worker 8 ending after 5383 good messages. last message was 9999
> worker 4 ending after 5383 good messages. last message was 9999
> worker 1 ending after 5383 good messages. last message was 9999
> worker 3 ending after 5383 good messages. last message was 9999
> worker 6 ending after 5383 good messages. last message was 9999
> worker 0 ending after 5383 good messages. last message was 9999
> all own thread have terminated
> router has terminated
>
> =====SNIP===code==================================
>
> #include <stdlib.h>
> #include <stdio.h>
> #include <pthread.h>
> #include <unistd.h>
> #include <zmq.h>
> #include <czmq.h>
> #include <zframe.h>
> #include <string>
>
> int numWorkers=10;
>
> void sethwm(int o,void*s) {
>     int opt;
>     size_t len=4;
>     opt=500000;
>     int rc=zmq_setsockopt(s,o,&opt,len);
>     if (rc!=0)
>         printf("error setting HWM\n");
>     opt=-1;
>     rc=zmq_getsockopt(s,o,&opt,&len);
>     if (rc!=0)
>         printf("error getting HWM\n");
> }
>
> class Router {
>     public: volatile bool leave=false;
>     private:
>     void inline goRouter() {
>         void *clients=zmq_socket(ctx, ZMQ_SUB);
>         sethwm(ZMQ_RCVHWM,clients);
>         void *workers=zmq_socket(ctx, ZMQ_PUB);
>         sethwm(ZMQ_SNDHWM,workers);
>         int rc=zmq_setsockopt(clients,ZMQ_SUBSCRIBE,"",0);
>         if (rc!=0) printf("Error in line %i\n",__LINE__ -1);
>         rc=zmq_bind(clients,"tcp://127.0.0.1:10000");
>         if (rc!=0) printf("Error %i %s in line
> %i\n",zmq_errno(),zmq_strerror(zmq_errno()),__LINE__ -1);
>         rc=zmq_bind(workers,"tcp://127.0.0.1:10001");
>         if (rc!=0) printf("Error %i %s in line
> %i\n",zmq_errno(),zmq_strerror(zmq_errno()),__LINE__ -1);
>         zmq_pollitem_t items[1];
>         items[0].socket=clients;
>         items[0].events=ZMQ_POLLIN;
>         int currentWorker=-1,nextWorker=0;
>         int workerCount=1;
>         bool lastReceived=false;
>         do {
>             items[0].revents=0;
>             rc=zmq_poll(items,1,10);
>             if (rc<0) printf("Error in line %i\n",__LINE__ -1);
>             if (rc==0) continue;
>             if (items[0].revents) {
>                 // we got a request
>                 zmsg_t* request=zmsg_recv(items[0].socket);
>                 if (request==0) printf("Error in line %i\n",__LINE__ -1);
>                 zmsg_first(request);
>                 zmsg_next(request);
>                 zframe_t* data_f=zmsg_next(request);
>                 char* data=(char*)zframe_data(data_f);
>                 rc=zmsg_send(&request,workers);
>                 if (request!=0) printf("Error in line %i\n",__LINE__ -1);
>                 if (rc<0)
>                     printf("error sending in router: %is
> %s\n",errno,strerror(errno));
>             }
>         } while (workerCount>0 && !leave);
>     }
>     public:
>     static void* ctx;
>     static void* lauch_me(void*p) {
>         static_cast<Router*>(p)->goRouter();
>         return 0;
>     }
> };
>
> class Worker {
>     public: volatile bool leave=false;
>     private:
>     void inline goWorker() {
>         void *router=zmq_socket(ctx, ZMQ_SUB);
>         sethwm(ZMQ_RCVHWM,router);
>         char instanceName[10];
>         memcpy(instanceName,"Worker000",10);
>         instanceName[6]+=instanceNumber/100;
>         instanceName[7]+=(instanceNumber%100)/10;
>         instanceName[8]+=instanceNumber%10;
>         int rc=zmq_setsockopt(router,ZMQ_SUBSCRIBE,0,0);
>         if (rc!=0) printf("Error in line %i\n",__LINE__ -1);
>         rc=zmq_connect(router,"tcp://127.0.0.1:10001");
>         if (rc!=0) printf("Error in line %i\n",__LINE__ -1);
>         zmq_pollitem_t items[1];
>         items[0].socket=router;
>         items[0].events=ZMQ_POLLIN;
>         int lastMessageNumber=-1,currentMessageNumber=-1;
>         bool leaveByMessage=false;
>         int OKcount=0;
>         while (!leave) {
>             items[0].revents=0;
>             rc=zmq_poll(items,1,10);
>             if (rc<0) printf("Error in line %i\n",__LINE__ -1);
>             if (rc==0) continue;
>             if (items[0].revents) {
>                 zmsg_t* request=zmsg_recv(items[0].socket);
>                 zframe_t *data_f=zmsg_first(request);
>                 char *data=(char*)zframe_data(data_f);
>                 if (strncmp(data,"Die!",4)==0) {
>                     leave=true; leaveByMessage=true;
>                 } else {
>                     currentMessageNumber= *((int*)data);
>                     if (currentMessageNumber == lastMessageNumber+1)
>                         OKcount++;
>                     else
>                         printf("worker %i expected message %i, got message
> %i\n",instanceNumber,lastMessageNumber+1,currentMessageNumber);
>                     lastMessageNumber=currentMessageNumber;
>                 }
>             }
>         }
>         printf("worker %i ending after %i good messages%s. last message
> was %i \n",instanceNumber,OKcount,leaveByMessage?"":" by hard
> termination",lastMessageNumber);
>         sleep(1);
>         rc=zmq_disconnect(router,"tcp://127.0.0.1:10001");
>         if (rc!=0) printf("Error in line %i\n",__LINE__ -1);
>         rc=zmq_close (router);
>         if (rc!=0) printf("Error in line %i\n",__LINE__ -1);
>     }
>     public:
>     byte instanceNumber;
>     static void* ctx;
>     static void* lauch_me(void*p) {
>         static_cast<Worker*>(p)->goWorker();
>         return 0;
>     }
> };
>
> void* Router::ctx=0;
> void* Worker::ctx=0;
>
> int main(int argc, char**argv) {
>     Router::ctx=zmq_ctx_new(); // create global context
>     zmq_ctx_set(Router::ctx,ZMQ_IO_THREADS,100);
>     Worker::ctx=Router::ctx; // use same context in worker and router
>
>     Router *router=new Router();
>     pthread_t r_tid;
>     printf("starting Router ...\n");
>     int ret=pthread_create(&r_tid, 0, Router::lauch_me, router);
>     if (ret!=0) exit(1);
>     bool a=true;
> //    while (a) sleep (1);
>     sleep(1);
>     printf(" thread id %li\n",r_tid);
>
>     Worker *workers[100];
>     pthread_t worker_ids[100];
>     printf("starting %i Workers ...\n",numWorkers);
>     for(int i=0; i<numWorkers; i++) {
>         workers[i]=new Worker();
>         workers[i]->instanceNumber=i;
>         int ret=pthread_create(&worker_ids[i], 0, Worker::lauch_me,
> workers[i]);
>         if (ret!=0) exit(1);
>     }
>     printf("finished starting Workers ...\n");
>     sleep(10);
>
>     printf("sending messages ...\n");
>
>     void* socket=zmq_socket(Router::ctx,ZMQ_PUB);
>     sethwm(ZMQ_SNDHWM,socket);
>     zmq_connect(socket,"tcp://127.0.0.1:10000");
>     for (int i=0; i<10000; i++) {
> //        if (i%100==0) printf(".");
>         int rc;
>         int errorcount=0;
>         int *datas=(int*)malloc(4); *datas=i;
>         rc=zmq_send(socket,datas,sizeof(*datas),0);
>         if (rc!=4) printf("fehler beim senden von message %i\n",i);
>     }
>     printf("sending messages done\n");
>     int rc=zmq_send(socket,"Die!",4,0);
>     if (rc!=4) {
>         printf("cound not send termination message to workers, killing
> them by static variable\n");
>         for (int i=0; i<numWorkers; i++) {
>             workers[i]->leave=true;
>         }
>     }
>     sleep(30);
>     router->leave=true;
>     void*result;
>     for (int i=0; i<numWorkers; i++) {
>         workers[i]->leave=true;
>         pthread_join(worker_ids[i],&result);
>     }
>     printf("all own thread have terminated\n");
>     pthread_join(r_tid,&result);
>     printf("router has terminated\n");
> }
>
>
>
>             printf("cound not send termination message to worker %i");
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20150306/7a8325b3/attachment.htm>


More information about the zeromq-dev mailing list